import requests from environs import Env # Initialize environment env = Env() env.read_env() # Looks for .env files in current directory # Set your token (in a real environment, get this from env vars) HF_TOKEN = env("HF_TOKEN") ORG_NAME = env("ORG_NAME") BASE_URL = env("BASE_URL") # HTTP Headers for API requests headers = { "Authorization": f"Bearer {HF_TOKEN}", "Content-Type": "application/json" } # First thing's first: create the repo (or use existing one) def create_repository(org_name, repo_name): """Create repository or use existing one""" # First check if it already exists repo_url = f"{BASE_URL}/models/{org_name}/{repo_name}" check_response = requests.get(repo_url, headers=headers) if check_response.status_code == 200: print(f"Good news! Repository {org_name}/{repo_name} already exists!") return True # Try to create it if it doesn't exist create_url = f"{BASE_URL}/repos/create" print(f"Houston, stand by: Creating repo with URL: {create_url}") data = { "type": "model", "name": repo_name, "organization": org_name, "private": True } response = requests.post(create_url, headers=headers, json=data) if response.status_code == 200: print(f"Woohoo!!! Created repository: {org_name}/{repo_name}") return True else: # Check if the error is because the repo already exists error_data = response.json() if "error" in error_data and "already created" in error_data["error"]: print(f"Repository {org_name}/{repo_name} already exists! Moving on.") return True else: print(f"Son of a Lambda Cold Start! Failed to create repository: {response.text}") return False def create_resource_group_for_repo(repo_name, creator_username): """ The magical function that creates a resource group for a new repository. """ # Step 0: Make the repo first or we're going nowhere fast if not create_repository(ORG_NAME, repo_name): print("Hey, yo! Slow your roll! Can't make a resource group for a repo that doesn't exist!") return False # Step 1: Create our glorious resource group rg_name = f"rg-{repo_name}" # Simpler name, less chance for API to throw a fit rg_url = f"{BASE_URL}/organizations/{ORG_NAME}/resource-groups" print(f"Conjuring a resource group with URL: {rg_url}") # Let's be direct - just create it with minimal data data = { "name": rg_name, "description": f"Automagically created for {repo_name} by the Wizard of Automation" } # Throw the request into the void and pray for a 200 or 201 response = requests.post(rg_url, headers=headers, json=data) # Print full response for debugging (you know it will happen) print(f"Resource group creation response: {response.status_code} - {response.text}") success = False if response.status_code == 201 or response.status_code == 200: try: response_data = response.json() if "id" in response_data: success = True rg_id = response_data.get("id") print(f"Resource group successfully summoned! ID: {rg_id}") else: print("Response has unexpected format - missing 'id' field") except Exception as e: print(f"Error parsing response: {e}") if not success: print(f"Son of a Nutcracker! Resource group creation failed: {response.text}") return False # We're in business! Grab that ID rg_id = response.json().get("id") print(f"I stole, I mean grabbed that Resource Group ID because I need it: {rg_id}") # Step 2: Connect the repo to our shiny new resource group repo_url = f"{BASE_URL}/models/{ORG_NAME}/{repo_name}/resource-group" repo_data = { "resourceGroupId": rg_id } repo_response = requests.post(repo_url, headers=headers, json=repo_data) if repo_response.status_code != 200: print(f"Son of an API! Failed to connect repo: {repo_response.text}") return False print(f"Repository {repo_name} has joined the resource group party!") # Step 3: Make the creator an admin - they brought this repo into the world after all member_url = f"{BASE_URL}/organizations/{ORG_NAME}/resource-groups/{rg_id}/users" member_data = { "users": [ { "user": creator_username, "role": "admin" } ] } member_response = requests.post(member_url, headers=headers, json=member_data) if member_response.status_code != 200: print(f"Son of a broken websocket! Creator couldn't be added: {member_response.text}") # We'll continue anyway - the group and repo connection are working else: print(f"Creator {creator_username} is now the admin overlord of this resource group!") # Victory dance return True # Demo time: This simulates what would happen when a new repo is created def main(): repo_name = "testing-magic" creator = "joshhayles" print(f"A WIZARD created a new Repo: {ORG_NAME}/{repo_name}") result = create_resource_group_for_repo(repo_name, creator) if result: print("Ohh, snap! Successfully set up resource group for the new repository!") else: print("Damn. Failed to set up resource group") if __name__ == "__main__": main()