import asyncio import os import sys import time import gradio as gr import uuid from datasets import load_dataset from huggingface_hub import whoami from loguru import logger from pathlib import Path from yourbench_space.config import generate_and_save_config from yourbench_space.utils import ( SubprocessManagerGroup, save_files, update_dataset, STAGES, is_running_locally ) from yourbench_space.evaluation import create_eval_file, run_evaluations from yourbench_space.leaderboard_space.env import HF_TOKEN project_description = """ # YourBench πŸš€ **Dynamic Benchmark Generation for Language Models** Quickly create zero-shot benchmarks from your documents – keeping models accurate and adaptable - πŸ“– [FAQ](#) - πŸ’» [GitHub](https://github.com/huggingface/yourbench/tree/v0.2-alpha-space) """ logger.remove() logger.add(sys.stderr, level="INFO") # Global to store all managers per session MANAGERS = SubprocessManagerGroup() USER_ID_SESSION_MAP: dict[str, str] = dict() docs_path = Path(__file__).parent / "docs.md" citation_content = ( docs_path.read_text().split("# Citation")[-1].strip() if docs_path.exists() else "# Citation\n\nDocumentation file not found." ) def generate_and_return(hf_org, hf_dataset_name, session_state: gr.State): manager = MANAGERS.get(session_state) if manager is None: # should not be possible return ( "❌ Config generation failed.", gr.update(visible=False, interactive=False), ) session_uid = session_state.value config_path = generate_and_save_config(hf_org, hf_dataset_name, session_uid, manager.config_path) for _ in range(5): time.sleep(0.5) if config_path.exists(): return ( "βœ… Config saved!", gr.update(value=str(config_path), visible=True, interactive=True), ) return ( "❌ Config generation failed.", gr.update(visible=False, interactive=False), ) final_dataset = None def update_process_status(session_state: gr.State): """Update process status and include exit details if process has terminated""" if session_state is None: return gr.update(value=False, label="Not running") manager = MANAGERS.get(session_state.value) if manager is None: return gr.update(value=False, label="Not running") is_running = manager.is_running() if not is_running: exit_code, exit_reason = manager.get_exit_details() status_text = f"Process Status: Stopped - {exit_reason}, exit code - {exit_code}" if exit_reason else "Process Status: Stopped" return gr.update(value=False, label=status_text) return gr.update(value=True, label="Process Status: Running") def prepare_task(session_uid: str, oauth_token: gr.OAuthToken | None, hf_dataset_name: str, _=None): if oauth_token is None and not is_running_locally(): gr.Warning('You need to log in to use this Space') return new_env = os.environ.copy() if oauth_token: new_env["HF_TOKEN"] = oauth_token.token new_env["DATASET_PREFIX"] = hf_dataset_name MANAGERS.start_process(session_uid, custom_env=new_env) def update_hf_org_dropdown(oauth_token: gr.OAuthToken | None): if oauth_token is None: return gr.Dropdown([], label="Organization") try: user_info = whoami(oauth_token.token) org_names = [org["name"] for org in user_info.get("orgs", [])] user_name = user_info.get("name", "Unknown User") org_names.insert(0, user_name) return gr.Dropdown(org_names, value=user_name, label="Organization") except Exception as e: return gr.Dropdown([], label="Organization") def switch_to_run_generation_tab(): return gr.Tabs(selected=1) def enable_button(files): return gr.update(interactive=bool(files)) def run_evaluation_pipeline(oauth_token: gr.OAuthToken | None, org_name, eval_name): # Test dataset existence eval_ds_name = f"{org_name}/{eval_name}" # Test dataset existence try: load_dataset(eval_ds_name, streaming=True, token=oauth_token.token) except Exception as e: print(f"Error while loading the dataset: {e}") return # Run evaluations create_eval_file(eval_ds_name) status = asyncio.run(run_evaluations(eval_ds_name=eval_ds_name, org=org_name)) # Create space from huggingface_hub import HfApi repo_id = f"{org_name}/leaderboard_yourbench_{eval_ds_name.replace('/', '_')}" api = HfApi() try: api.create_repo(repo_id=repo_id, repo_type="space", space_sdk="gradio", token=oauth_token.token) api.upload_folder(repo_id=repo_id, repo_type="space", folder_path="src/", token=oauth_token.token) api.add_space_secret(repo_id=repo_id, key="HF_TOKEN", value=oauth_token.token, token=oauth_token.token) api.add_space_variable(repo_id=repo_id, key="TASK", value=eval_ds_name, token=oauth_token.token) api.add_space_variable(repo_id=repo_id, key="ORG_NAME", value=org_name, token=oauth_token.token) except Exception as e: status = "Evaluation" + status + "\nLeaderboard creation:" + e return status def init_session(profile: gr.OAuthProfile | None): """Update session on load""" if is_running_locally(): username = "local" elif profile: username = profile.username else: username = None local_uuid = USER_ID_SESSION_MAP.get(username, str(uuid.uuid4())) if manager := MANAGERS.get(local_uuid): if manager.is_running(): logger.info(f"Found existing running session for {local_uuid}, restoring") return gr.State(local_uuid, delete_callback=lambda uid: MANAGERS.remove(uid)) else: logger.info(f"Found existing stale session for {local_uuid}, starting new") MANAGERS.remove(local_uuid) local_uuid = str(uuid.uuid4()) if username: USER_ID_SESSION_MAP[username] = local_uuid MANAGERS.create(local_uuid) logger.info(f"Started session for {local_uuid}") return gr.State(local_uuid, delete_callback=lambda uid: MANAGERS.remove(uid)) with gr.Blocks(theme=gr.themes.Default()) as app: # We initialize the session state with the user randomly generated uuid # Using uuid4 makes collision cases extremely unlikely even for concurrent users session_state = gr.State() gr.Markdown(project_description) with gr.Tabs() as tabs: with gr.Tab("Setup", id=0): with gr.Row(): with gr.Accordion("Hugging Face Settings"): login_btn = gr.LoginButton() hf_org_dropdown = gr.Dropdown( choices=[], label="Organization", allow_custom_value=True ) app.load( update_hf_org_dropdown, inputs=None, outputs=hf_org_dropdown ) hf_dataset_name = gr.Textbox( label="Dataset name", value="yourbench", info="Name of your new evaluation dataset", ) with gr.Accordion("Upload documents"): file_input = gr.File( label="Upload text files", file_count="multiple", file_types=[".txt", ".md", ".html", ".pdf"], ) output = gr.Textbox(label="Log") file_input.upload( save_files, inputs=[session_state, file_input], outputs = output, ) preview_button = gr.Button("Generate New Config", interactive=False) log_message = gr.Textbox(label="Log Message", visible=True) download_button = gr.File( label="Download Config", visible=False, interactive=False ) file_input.change(enable_button, inputs=file_input, outputs=preview_button) preview_button.click( generate_and_return, inputs=[hf_org_dropdown, hf_dataset_name, session_state], outputs=[log_message, download_button], ) preview_button.click( switch_to_run_generation_tab, inputs=None, outputs=tabs, ) with gr.Tab("Run Generation", id=1): with gr.Row(): start_button = gr.Button("Start Task") start_button.click(prepare_task, inputs=[session_state, login_btn, hf_dataset_name]) stop_button = gr.Button("Stop Task") stop_button.click(MANAGERS.stop_process, inputs=session_state) kill_button = gr.Button("Kill Task") kill_button.click(MANAGERS.kill_process, inputs=session_state) with gr.Row(): with gr.Column(): with gr.Accordion("Log Output", open=True): log_output = gr.Code(language=None, lines=20, interactive=False) process_status = gr.Checkbox(label="Process Status", interactive=False) status_timer = gr.Timer(2.0, active=True) status_timer.tick(update_process_status, inputs=session_state, outputs=process_status) with gr.Column(): with gr.Accordion("Stages", open=True): stages_table = gr.CheckboxGroup( choices=STAGES, value=[], label="Pipeline Stages Completed", interactive=False, ) with gr.Accordion("Ingestion"): ingestion_df = gr.DataFrame() with gr.Accordion("Summarization"): summarization_df = gr.DataFrame() with gr.Accordion("Single-Hop"): single_hop = gr.DataFrame() with gr.Accordion("Answer Generation"): answers_df = gr.DataFrame() stages_table.change( update_dataset, inputs=[stages_table, hf_org_dropdown, hf_dataset_name], outputs=[ingestion_df, summarization_df, single_hop, answers_df] ) # TODO: this timer should only be active when the second tab is passed to active for the first time log_timer = gr.Timer(1.0, active=True) log_timer.tick( MANAGERS.read_and_get_output, inputs=session_state, outputs=[log_output, stages_table] ) with gr.Tab("Evaluate", id=2, visible=False): with gr.Row(): btn_launch_evals = gr.Button("Launch evaluations") status = gr.Textbox(label="Status") btn_launch_evals.click(run_evaluation_pipeline, [hf_org_dropdown, hf_dataset_name], status) app.load(init_session, outputs=session_state) app.launch(allowed_paths=["/app"])