import os import sys import time import uuid import asyncio from pathlib import Path from loguru import logger import gradio as gr from datasets import load_dataset from huggingface_hub import HfApi, whoami from yourbench_space import PATH from yourbench_space.utils import ( STAGES, SubprocessManagerGroup, save_files, update_dataset, map_stage_names, is_running_locally, on_generation_succsess, ) from yourbench_space.config import generate_and_save_config from yourbench_space.evaluation import run_evaluations, create_eval_file project_description = """ # ๐Ÿš€ YourBench ### Dynamic Benchmark Generation from Your Documents - Create zero-shot benchmarks from your documents โ€” no manual labeling - Evaluate top open models and publish a leaderboard in one click - Run locally or explore the [source on GitHub](https://github.com/huggingface/yourbench) โš ๏ธ **Important:** This app uses your Hugging Face token for inference and uploads โ€” you are responsible for any usage costs Built with ๐Ÿค— by the [Hugging Face OpenEvals team](https://huggingface.co/OpenEvals) """ logger.remove() logger.add(sys.stderr, level="INFO") # Global to store all managers per session MANAGERS = SubprocessManagerGroup() USER_ID_SESSION_MAP: dict[str, str] = {} docs_path = Path(__file__).parent / "docs.md" citation_content = ( docs_path.read_text().split("# Citation")[-1].strip() if docs_path.exists() else "# Citation\n\nDocumentation file not found." ) def generate_and_return(hf_org, hf_dataset_name, session_state: gr.State): manager = MANAGERS.get(session_state) if manager is None: # should not be possible return ( "โŒ Config generation failed", gr.update(visible=False, interactive=False), ) session_uid = session_state.value config_path = generate_and_save_config(hf_org, hf_dataset_name, session_uid, manager.config_path) for _ in range(5): time.sleep(0.5) if config_path.exists(): gr.Success("โœ… Config generated successfully!") return ( "โœ… Config saved successfully!", gr.update(value=str(config_path), visible=True, interactive=True), ) gr.Error("Failed to generate config") return ( "โŒ Config generation failed", gr.update(visible=False, interactive=False), ) final_dataset = None def update_process_status(session_state: gr.State): """Update process status and include exit details if process has terminated""" if session_state is None: return gr.update(value=False, label="Not running") manager = MANAGERS.get(session_state.value) if manager is None: return gr.update(value=False, label="Not running") is_running = manager.is_running() if not is_running: exit_code, exit_reason = manager.get_exit_details() status_text = ( f"Process Status: Stopped - {exit_reason}, exit code - {exit_code}" if exit_reason else "Process Status: Stopped" ) return gr.update(value=False, label=status_text) return gr.update(value=True, label="Process Status: Running") def prepare_task(session_uid: str, oauth_token: gr.OAuthToken | None, hf_dataset_name: str, _=None): if oauth_token is None and not is_running_locally(): gr.Warning("You need to log in to use this Space") return new_env = os.environ.copy() if oauth_token: new_env["HF_TOKEN"] = oauth_token.token new_env["DATASET_PREFIX"] = hf_dataset_name MANAGERS.start_process(session_uid, custom_env=new_env) def update_hf_org_dropdown(oauth_token: gr.OAuthToken | None): if oauth_token is None: return gr.Dropdown([], label="Organization") try: user_info = whoami(oauth_token.token) org_names = [org["name"] for org in user_info.get("orgs", [])] user_name = user_info.get("name", "Unknown User") org_names.insert(0, user_name) return gr.Dropdown(org_names, value=user_name, label="Organization") except Exception as e: return gr.Dropdown([], label="Organization") def switch_to_run_generation_tab(): return gr.Tabs(selected=1) def enable_button(files): return gr.update(interactive=bool(files)) def run_evaluation_pipeline(oauth_token: gr.OAuthToken | None, org_name, eval_name, config_name="lighteval"): eval_ds_name = f"{org_name}/{eval_name}" repo_id = f"{org_name}/leaderboard_yourbench_{eval_ds_name.replace('/', '_')}" folder_path = str(Path(PATH) / "yourbench_space" / "leaderboard_space") try: load_dataset(eval_ds_name, name=config_name, streaming=True, token=oauth_token.token) except Exception as e: logger.error(f"Failed to load dataset '{eval_ds_name}': {e}") return "โŒ Failed: Dataset loading error" new_env = os.environ.copy() if oauth_token: new_env["HF_TOKEN"] = oauth_token.token try: create_eval_file(eval_ds_name) status = asyncio.run(run_evaluations(org=org_name, eval_ds_name=eval_ds_name, custom_env=new_env)) except Exception as e: logger.error(f"Evaluation error: {e}") return f"โŒ Failed: Evaluation error\n{e}" api = HfApi() space_was_regenerated = False try: api.create_repo( repo_id=repo_id, repo_type="space", space_sdk="gradio", token=oauth_token.token, ) except Exception as e: if "409" in str(e) and "already created this space repo" in str(e): logger.info(f"Space '{repo_id}' already exists. Deleting and regenerating it.") try: api.delete_repo(repo_id=repo_id, repo_type="space", token=oauth_token.token) api.create_repo( repo_id=repo_id, repo_type="space", space_sdk="gradio", token=oauth_token.token, ) space_was_regenerated = True except Exception as delete_err: logger.error(f"Failed to delete and recreate space '{repo_id}': {delete_err}") return f"โœ… Evaluation succeeded\nโŒ Failed: Could not recreate space\n{delete_err}" else: logger.error(f"Space creation error: {e}") return f"โœ… Evaluation succeeded\nโŒ Failed: Space creation error\n{e}" try: api.upload_folder( repo_id=repo_id, repo_type="space", folder_path=folder_path, token=oauth_token.token, ) api.add_space_secret( repo_id=repo_id, key="HF_TOKEN", value=oauth_token.token, token=oauth_token.token, ) api.add_space_variable(repo_id=repo_id, key="TASK", value=eval_ds_name, token=oauth_token.token) api.add_space_variable(repo_id=repo_id, key="ORG_NAME", value=org_name, token=oauth_token.token) except Exception as e: logger.error(f"Failed during space setup: {e}") return f"โœ… Evaluation succeeded\nโŒ Failed: Space setup error\n{e}" if space_was_regenerated: return f"โœ… Evaluation succeeded\n๐Ÿ” Space '{repo_id}' was regenerated successfully" return f"โœ… Evaluation and Space creation completed successfully for: {repo_id}" def init_session(profile: gr.OAuthProfile | None): """Update session on load""" if is_running_locally(): username = "local" elif profile: username = profile.username else: username = None local_uuid = USER_ID_SESSION_MAP.get(username, str(uuid.uuid4())) if manager := MANAGERS.get(local_uuid): if manager.is_running(): logger.info(f"Found existing running session for {local_uuid}, restoring") return gr.State(local_uuid, delete_callback=lambda uid: MANAGERS.remove(uid)) else: logger.info(f"Found existing stale session for {local_uuid}, starting new") MANAGERS.remove(local_uuid) local_uuid = str(uuid.uuid4()) if username: USER_ID_SESSION_MAP[username] = local_uuid MANAGERS.create(local_uuid) logger.info(f"Started session for {local_uuid}") return gr.State(local_uuid, delete_callback=lambda uid: MANAGERS.remove(uid)) btn_launch_evals = gr.Button( "๐Ÿš€ Launch Evaluation", visible=True, interactive=False, # Start non-interactive variant="primary", ) with gr.Blocks(theme=gr.themes.Default()) as app: session_state = gr.State() gr.Markdown(project_description) with gr.Tabs() as tabs: with gr.Tab("Choose Documents & Settings", id=0): with gr.Column(): gr.Markdown("### ๐Ÿ“„ Choose your documents and settings") gr.Markdown( "Upload your source documents that will form the knowledge base for your benchmark. Set a Hugging Face organization and dataset name." ) gr.Markdown( "This step also generates a config file for running the benchmark pipeline. You can download it to run YourBench locally." ) with gr.Row(): with gr.Accordion("Hugging Face Settings"): login_btn = gr.LoginButton() hf_org_dropdown = gr.Dropdown(choices=[], label="Organization", allow_custom_value=True) app.load(update_hf_org_dropdown, inputs=None, outputs=hf_org_dropdown) hf_dataset_name = gr.Textbox( label="Dataset name", value="yourbench", info="Name of your new evaluation dataset", ) with gr.Accordion("Upload Files"): file_input = gr.File( label="Upload text files", file_count="multiple", file_types=[".txt", ".md", ".html", ".pdf"], ) output = gr.Textbox(label="Log") file_input.upload( save_files, inputs=[session_state, file_input], outputs=output, ) delete_button = gr.Button("Delete Uploaded Files", visible=False) preview_button = gr.Button("Generate New Config", interactive=False) log_message = gr.Textbox(label="Log Message", visible=True) download_button = gr.File(label="Download Config", visible=False, interactive=False) file_input.change( lambda files: gr.update(visible=bool(files)), inputs=file_input, outputs=delete_button, ) file_input.change(enable_button, inputs=file_input, outputs=preview_button) def clean_and_confirm(uid): MANAGERS.clean_workdir(uid) return ( "๐Ÿ—‘๏ธ All uploaded files have been deleted!", gr.update(value=None), gr.update(interactive=False), ) delete_button.click( clean_and_confirm, inputs=session_state, outputs=[output, file_input, preview_button], ) preview_button.click( generate_and_return, inputs=[hf_org_dropdown, hf_dataset_name, session_state], outputs=[log_message, download_button], ) preview_button.click( switch_to_run_generation_tab, inputs=None, outputs=tabs, ) with gr.Tab("Run Benchmark Pipeline", id=1): with gr.Column(): gr.Markdown("### โš™๏ธ Run the benchmark generation pipeline") gr.Markdown( "Start the pipeline to process documents, generate questions, and build the private evaluation dataset. Watch logs, track progress, and preview the results." ) with gr.Row(): start_button = gr.Button("Start Task") stop_button = gr.Button("Stop Task") kill_button = gr.Button("Kill Task") start_button.click(prepare_task, inputs=[session_state, login_btn, hf_dataset_name]) stop_button.click(MANAGERS.stop_process, inputs=session_state) kill_button.click(MANAGERS.kill_process, inputs=session_state) process_status = gr.Checkbox(label="Process Status", interactive=False) status_timer = gr.Timer(2.0, active=True) status_timer.tick(update_process_status, inputs=session_state, outputs=process_status) with gr.Row(): with gr.Accordion("Stages", open=True): stages_table = gr.CheckboxGroup( choices=map_stage_names(STAGES), value=[], label="Pipeline Stages Completed", container=False, interactive=False, ) with gr.Row(): with gr.Column(): with gr.Accordion("Log Output", open=True): log_output = gr.Code(language=None, lines=20, interactive=False) with gr.Column(): with gr.Accordion("Ingestion Preview"): ingestion_df = gr.DataFrame() with gr.Accordion("Summarization Preview"): summarization_df = gr.DataFrame() with gr.Accordion("Single Shot Preview"): single_shot_df = gr.DataFrame() with gr.Accordion("Multi Hop Preview"): multi_hop_df = gr.DataFrame() with gr.Accordion("Lighteval Preview"): lighteval_df = gr.DataFrame() stages_table.change( update_dataset, inputs=[stages_table, hf_org_dropdown, hf_dataset_name], outputs=[ingestion_df, summarization_df, single_shot_df, multi_hop_df, lighteval_df], ) stages_table.change( on_generation_succsess, inputs=stages_table, outputs=[tabs, btn_launch_evals], ) # TODO: this timer should only be active when the second tab is passed to active for the first time log_timer = gr.Timer(1.0, active=True) log_timer.tick( MANAGERS.read_and_get_output, inputs=session_state, outputs=[log_output, stages_table], ) with gr.Tab("Evaluate Models on Benchmark", id=2): with gr.Column(): gr.Markdown("### ๐Ÿงช Evaluate models on your benchmark") gr.Markdown( "Runs the evaluation with [Lighteval](https://github.com/huggingface/lighteval) on the resulted dataset using 5+ open models, then deploys a leaderboard as a Hugging Face Space under your org." ) with gr.Row(): with gr.Column(): btn_launch_evals.render() with gr.Column(): clear_status_btn = gr.Button("Clear", variant="secondary") with gr.Accordion("Evaluation Log", open=True): eval_status = gr.Textbox(label="", lines=6, interactive=False, show_label=False) btn_launch_evals.click( run_evaluation_pipeline, [hf_org_dropdown, hf_dataset_name, gr.State("lighteval")], eval_status, ) clear_status_btn.click(lambda: "", outputs=eval_status) app.load(init_session, outputs=session_state) app.launch(allowed_paths=[PATH])