MinCPionS / hfspaces_tracking.py
KoRiF
improve data lifecycle
6791083
import sqlite3
import os
from huggingface_hub import HfApi
import requests
import huggingface_hub
from huggingface_hub.hf_api import SpaceInfo
from typing import List, Dict, Any, Union
import json
# Test if huggingface_hub is properly imported
print("HuggingFace Hub version:", huggingface_hub.__version__)
#print("HuggingFace Hub API version:", HfApi.__version__)
# Use persistent storage if available
DB_PATH = '/data/huggingface_spaces.db' if os.path.exists('/data') else 'huggingface_spaces.db'
SQL_CREATE_SPACES = 'sql/create_spaces.sql'
SQL_UPDATE_SPACES = 'sql/update_spaces.sql'
SQL_CREATE_ENDPOINTS = 'sql/create_endpoints.sql'
SQL_UPDATE_ENDPOINTS = 'sql/update_endpoints.sql'
SQL_CREATE_TOOLS = 'sql/create_tools.sql'
SQL_UPDATE_TOOLS = 'sql/update_tools.sql'
from sql.sql_utils import load_sql_query, is_database_outdated, update_db_timestamp, create_metadata_table
def create_database():
"""Initialize database if needed"""
# Load and execute table creation SQL
create_metadata_table(DB_PATH)
query_create_spaces = load_sql_query(SQL_CREATE_SPACES)
query_create_endpoints = load_sql_query(SQL_CREATE_ENDPOINTS)
query_create_tools = load_sql_query(SQL_CREATE_TOOLS)
with sqlite3.connect(DB_PATH) as conn:
conn.executescript(query_create_spaces)
conn.executescript(query_create_endpoints)
conn.executescript(query_create_tools)
conn.commit()
def generate_endpoint_urls(space_id: str) -> Dict[str, str]:
"""Generate potential endpoint URLs for a space"""
# Convert "author/space-name" to "author-space-name"
subdomain = space_id.replace('/', '-').replace('_', '-').lower()
return {
"sse": f"https://{subdomain}.hf.space/gradio_api/mcp/sse",
"schema": f"https://{subdomain}.hf.space/gradio_api/mcp/schema"
}
def check_endpoint_availability(url: str) -> bool:
"""Check if endpoint exists and returns valid response"""
try:
response = requests.head(url, timeout=5, allow_redirects=True)
return response.status_code == 200
except (requests.exceptions.RequestException, requests.exceptions.Timeout):
return False
def normalize_tool_format(tool_data: Union[Dict[str, Any], List[Dict[str, Any]]]) -> List[tuple[str, str, Dict]]:
"""
Normalize different tool formats into a consistent format.
Returns list of tuples: (tool_name, description, properties)
"""
result = []
if isinstance(tool_data, list):
# Handle list format
for tool in tool_data:
if name := tool.get('name'):
result.append((
name,
tool.get('description', ''),
tool.get('inputSchema', {})
))
else:
# Handle dictionary format
for name, data in tool_data.items():
result.append((
name,
data.get('description', ''),
data
))
return result
def fetch_and_parse_schema(url: str) -> Union[Dict[str, Any], List[Dict[str, Any]]]:
"""Fetch and parse tool schema from endpoint"""
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
except (requests.exceptions.RequestException, ValueError):
return []
def save_endpoints_and_tools(conn: sqlite3.Connection, space_id: str):
"""Discover and store endpoints and tools for a space"""
cursor = conn.cursor()
endpoint_urls = generate_endpoint_urls(space_id)
# Check and save endpoints
for endpoint_type, url in endpoint_urls.items():
if check_endpoint_availability(url):
query_update_endpoints = load_sql_query(SQL_UPDATE_ENDPOINTS)
cursor.execute(query_update_endpoints, (space_id, endpoint_type, url))
# Process schema endpoint if available
if 'schema' in endpoint_urls and check_endpoint_availability(endpoint_urls['schema']):
tools_data = fetch_and_parse_schema(endpoint_urls['schema'])
if not tools_data:
return
# Convert to normalized format and save
for tool_name, description, properties in normalize_tool_format(tools_data):
try:
query_update_tools = load_sql_query(SQL_UPDATE_TOOLS)
cursor.execute(query_update_tools, (
space_id,
tool_name,
description,
json.dumps(properties)
))
except Exception as e:
print(f"Error saving tool {tool_name} for space {space_id}: {e}")
continue
def fetch_spaces() -> List[Dict[str, Any]]:
"""
Fetch spaces using the Hugging Face API with enhanced filtering and model card access
"""
api = HfApi()
spaces = []
try:
# Get all spaces with 'mcp' or 'server' in their metadata
for space in api.list_spaces(
filter="gradio",
search="mcp-server",
limit=100,
full=True, # Get full metadata
):
try:
# Get detailed space information
space_info: SpaceInfo = api.space_info(repo_id=f"{space.id}")
# Extract model card information
model_card = space_info.cardData
# Get tags and additional metadata
tags = space_info.cardData.get("tags", [])
description = space_info.cardData.get("description", "")
title = space.id.split("/")[-1]
author = space.id.split("/")[0]
likes = space_info.likes
url = f"https://huggingface.co/spaces/{space.id}"
spaces.append({
'id': space.id,
'title': title,
'author': author,
'description': description,
'likes': likes,
'url': url,
#'model_card': model_card,
'tags': ' '.join(tags) if tags else None,
'last_modified': space_info.lastModified,
'private': space_info.private,
})
except Exception as e:
print(f"Error fetching space info for {space.id}: {e}")
continue
return spaces
except Exception as e:
print(f"Error fetching spaces: {e}")
return []
def save_to_database(spaces):
"""Save spaces data to database and process endpoints and tools"""
# Load SQL queries
query_update_spaces = load_sql_query(SQL_UPDATE_SPACES)
# Create database connection
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
try:
for space in spaces:
try:
# Execute insert query
cursor.execute(query_update_spaces, (
space['id'],
space['title'],
space['author'],
space['description'],
space['likes'],
space['url'],
space['tags'],
space['last_modified'],
space['private']
))
# Process endpoints and tools for each space
save_endpoints_and_tools(conn, space['id'])
except sqlite3.IntegrityError as e:
print(f"Error saving space {space['title']}: {e}")
continue
except Exception as e:
print(f"Unexpected error processing space {space['title']}: {e}")
continue
conn.commit()
print(f"Database saved at: {DB_PATH}")
print(f"Processed {len(spaces)} spaces")
finally:
conn.close()
def update_database():
"""Update database if needed"""
create_database()
if not is_database_outdated(DB_PATH):
print("Database is up to date")
return False
print("Starting fetching process...")
spaces_data = fetch_spaces()
save_to_database(spaces_data)
# Update last fetch time
update_db_timestamp(DB_PATH)
print("Process complete! Data saved to database")
return True
if __name__ == "__main__":
update_database()