alozowski's picture
alozowski HF Staff
Update pyproject.toml and apply ruff
64a657c
import os
import sys
import time
import uuid
import asyncio
from pathlib import Path
from loguru import logger
import gradio as gr
from datasets import load_dataset
from huggingface_hub import HfApi, whoami
from yourbench_space import PATH
from yourbench_space.utils import (
STAGES,
SubprocessManagerGroup,
save_files,
update_dataset,
map_stage_names,
is_running_locally,
on_generation_succsess,
)
from yourbench_space.config import generate_and_save_config
from yourbench_space.evaluation import run_evaluations, create_eval_file
project_description = """
# 🚀 YourBench
### Dynamic Benchmark Generation from Your Documents
- Create zero-shot benchmarks from your documents — no manual labeling
- Evaluate top open models and publish a leaderboard in one click
- Run locally or explore the [source on GitHub](https://github.com/huggingface/yourbench)
⚠️ **Important:** This app uses your Hugging Face token for inference and uploads — you are responsible for any usage costs
Built with 🤗 by the [Hugging Face OpenEvals team](https://huggingface.co/OpenEvals)
"""
logger.remove()
logger.add(sys.stderr, level="INFO")
# Global to store all managers per session
MANAGERS = SubprocessManagerGroup()
USER_ID_SESSION_MAP: dict[str, str] = {}
docs_path = Path(__file__).parent / "docs.md"
citation_content = (
docs_path.read_text().split("# Citation")[-1].strip()
if docs_path.exists()
else "# Citation\n\nDocumentation file not found."
)
def generate_and_return(hf_org, hf_dataset_name, session_state: gr.State):
manager = MANAGERS.get(session_state)
if manager is None: # should not be possible
return (
"❌ Config generation failed",
gr.update(visible=False, interactive=False),
)
session_uid = session_state.value
config_path = generate_and_save_config(hf_org, hf_dataset_name, session_uid, manager.config_path)
for _ in range(5):
time.sleep(0.5)
if config_path.exists():
gr.Success("✅ Config generated successfully!")
return (
"✅ Config saved successfully!",
gr.update(value=str(config_path), visible=True, interactive=True),
)
gr.Error("Failed to generate config")
return (
"❌ Config generation failed",
gr.update(visible=False, interactive=False),
)
final_dataset = None
def update_process_status(session_state: gr.State):
"""Update process status and include exit details if process has terminated"""
if session_state is None:
return gr.update(value=False, label="Not running")
manager = MANAGERS.get(session_state.value)
if manager is None:
return gr.update(value=False, label="Not running")
is_running = manager.is_running()
if not is_running:
exit_code, exit_reason = manager.get_exit_details()
status_text = (
f"Process Status: Stopped - {exit_reason}, exit code - {exit_code}"
if exit_reason
else "Process Status: Stopped"
)
return gr.update(value=False, label=status_text)
return gr.update(value=True, label="Process Status: Running")
def prepare_task(session_uid: str, oauth_token: gr.OAuthToken | None, hf_dataset_name: str, _=None):
if oauth_token is None and not is_running_locally():
gr.Warning("You need to log in to use this Space")
return
new_env = os.environ.copy()
if oauth_token:
new_env["HF_TOKEN"] = oauth_token.token
new_env["DATASET_PREFIX"] = hf_dataset_name
MANAGERS.start_process(session_uid, custom_env=new_env)
def update_hf_org_dropdown(oauth_token: gr.OAuthToken | None):
if oauth_token is None:
return gr.Dropdown([], label="Organization")
try:
user_info = whoami(oauth_token.token)
org_names = [org["name"] for org in user_info.get("orgs", [])]
user_name = user_info.get("name", "Unknown User")
org_names.insert(0, user_name)
return gr.Dropdown(org_names, value=user_name, label="Organization")
except Exception as e:
return gr.Dropdown([], label="Organization")
def switch_to_run_generation_tab():
return gr.Tabs(selected=1)
def enable_button(files):
return gr.update(interactive=bool(files))
def run_evaluation_pipeline(oauth_token: gr.OAuthToken | None, org_name, eval_name, config_name="lighteval"):
eval_ds_name = f"{org_name}/{eval_name}"
repo_id = f"{org_name}/leaderboard_yourbench_{eval_ds_name.replace('/', '_')}"
folder_path = str(Path(PATH) / "yourbench_space" / "leaderboard_space")
try:
load_dataset(eval_ds_name, name=config_name, streaming=True, token=oauth_token.token)
except Exception as e:
logger.error(f"Failed to load dataset '{eval_ds_name}': {e}")
return "❌ Failed: Dataset loading error"
new_env = os.environ.copy()
if oauth_token:
new_env["HF_TOKEN"] = oauth_token.token
try:
create_eval_file(eval_ds_name)
status = asyncio.run(run_evaluations(org=org_name, eval_ds_name=eval_ds_name, custom_env=new_env))
except Exception as e:
logger.error(f"Evaluation error: {e}")
return f"❌ Failed: Evaluation error\n{e}"
api = HfApi()
space_was_regenerated = False
try:
api.create_repo(
repo_id=repo_id,
repo_type="space",
space_sdk="gradio",
token=oauth_token.token,
)
except Exception as e:
if "409" in str(e) and "already created this space repo" in str(e):
logger.info(f"Space '{repo_id}' already exists. Deleting and regenerating it.")
try:
api.delete_repo(repo_id=repo_id, repo_type="space", token=oauth_token.token)
api.create_repo(
repo_id=repo_id,
repo_type="space",
space_sdk="gradio",
token=oauth_token.token,
)
space_was_regenerated = True
except Exception as delete_err:
logger.error(f"Failed to delete and recreate space '{repo_id}': {delete_err}")
return f"✅ Evaluation succeeded\n❌ Failed: Could not recreate space\n{delete_err}"
else:
logger.error(f"Space creation error: {e}")
return f"✅ Evaluation succeeded\n❌ Failed: Space creation error\n{e}"
try:
api.upload_folder(
repo_id=repo_id,
repo_type="space",
folder_path=folder_path,
token=oauth_token.token,
)
api.add_space_secret(
repo_id=repo_id,
key="HF_TOKEN",
value=oauth_token.token,
token=oauth_token.token,
)
api.add_space_variable(repo_id=repo_id, key="TASK", value=eval_ds_name, token=oauth_token.token)
api.add_space_variable(repo_id=repo_id, key="ORG_NAME", value=org_name, token=oauth_token.token)
except Exception as e:
logger.error(f"Failed during space setup: {e}")
return f"✅ Evaluation succeeded\n❌ Failed: Space setup error\n{e}"
if space_was_regenerated:
return f"✅ Evaluation succeeded\n🔁 Space '{repo_id}' was regenerated successfully"
return f"✅ Evaluation and Space creation completed successfully for: {repo_id}"
def init_session(profile: gr.OAuthProfile | None):
"""Update session on load"""
if is_running_locally():
username = "local"
elif profile:
username = profile.username
else:
username = None
local_uuid = USER_ID_SESSION_MAP.get(username, str(uuid.uuid4()))
if manager := MANAGERS.get(local_uuid):
if manager.is_running():
logger.info(f"Found existing running session for {local_uuid}, restoring")
return gr.State(local_uuid, delete_callback=lambda uid: MANAGERS.remove(uid))
else:
logger.info(f"Found existing stale session for {local_uuid}, starting new")
MANAGERS.remove(local_uuid)
local_uuid = str(uuid.uuid4())
if username:
USER_ID_SESSION_MAP[username] = local_uuid
MANAGERS.create(local_uuid)
logger.info(f"Started session for {local_uuid}")
return gr.State(local_uuid, delete_callback=lambda uid: MANAGERS.remove(uid))
btn_launch_evals = gr.Button(
"🚀 Launch Evaluation",
visible=True,
interactive=False, # Start non-interactive
variant="primary",
)
with gr.Blocks(theme=gr.themes.Default()) as app:
session_state = gr.State()
gr.Markdown(project_description)
with gr.Tabs() as tabs:
with gr.Tab("Choose Documents & Settings", id=0):
with gr.Column():
gr.Markdown("### 📄 Choose your documents and settings")
gr.Markdown(
"Upload your source documents that will form the knowledge base for your benchmark. Set a Hugging Face organization and dataset name."
)
gr.Markdown(
"This step also generates a config file for running the benchmark pipeline. You can download it to run YourBench locally."
)
with gr.Row():
with gr.Accordion("Hugging Face Settings"):
login_btn = gr.LoginButton()
hf_org_dropdown = gr.Dropdown(choices=[], label="Organization", allow_custom_value=True)
app.load(update_hf_org_dropdown, inputs=None, outputs=hf_org_dropdown)
hf_dataset_name = gr.Textbox(
label="Dataset name",
value="yourbench",
info="Name of your new evaluation dataset",
)
with gr.Accordion("Upload Files"):
file_input = gr.File(
label="Upload text files",
file_count="multiple",
file_types=[".txt", ".md", ".html", ".pdf"],
)
output = gr.Textbox(label="Log")
file_input.upload(
save_files,
inputs=[session_state, file_input],
outputs=output,
)
delete_button = gr.Button("Delete Uploaded Files", visible=False)
preview_button = gr.Button("Generate New Config", interactive=False)
log_message = gr.Textbox(label="Log Message", visible=True)
download_button = gr.File(label="Download Config", visible=False, interactive=False)
file_input.change(
lambda files: gr.update(visible=bool(files)),
inputs=file_input,
outputs=delete_button,
)
file_input.change(enable_button, inputs=file_input, outputs=preview_button)
def clean_and_confirm(uid):
MANAGERS.clean_workdir(uid)
return (
"🗑️ All uploaded files have been deleted!",
gr.update(value=None),
gr.update(interactive=False),
)
delete_button.click(
clean_and_confirm,
inputs=session_state,
outputs=[output, file_input, preview_button],
)
preview_button.click(
generate_and_return,
inputs=[hf_org_dropdown, hf_dataset_name, session_state],
outputs=[log_message, download_button],
)
preview_button.click(
switch_to_run_generation_tab,
inputs=None,
outputs=tabs,
)
with gr.Tab("Run Benchmark Pipeline", id=1):
with gr.Column():
gr.Markdown("### ⚙️ Run the benchmark generation pipeline")
gr.Markdown(
"Start the pipeline to process documents, generate questions, and build the private evaluation dataset. Watch logs, track progress, and preview the results."
)
with gr.Row():
start_button = gr.Button("Start Task")
stop_button = gr.Button("Stop Task")
kill_button = gr.Button("Kill Task")
start_button.click(prepare_task, inputs=[session_state, login_btn, hf_dataset_name])
stop_button.click(MANAGERS.stop_process, inputs=session_state)
kill_button.click(MANAGERS.kill_process, inputs=session_state)
process_status = gr.Checkbox(label="Process Status", interactive=False)
status_timer = gr.Timer(2.0, active=True)
status_timer.tick(update_process_status, inputs=session_state, outputs=process_status)
with gr.Row():
with gr.Accordion("Stages", open=True):
stages_table = gr.CheckboxGroup(
choices=map_stage_names(STAGES),
value=[],
label="Pipeline Stages Completed",
container=False,
interactive=False,
)
with gr.Row():
with gr.Column():
with gr.Accordion("Log Output", open=True):
log_output = gr.Code(language=None, lines=20, interactive=False)
with gr.Column():
with gr.Accordion("Ingestion Preview"):
ingestion_df = gr.DataFrame()
with gr.Accordion("Summarization Preview"):
summarization_df = gr.DataFrame()
with gr.Accordion("Single Shot Preview"):
single_shot_df = gr.DataFrame()
with gr.Accordion("Multi Hop Preview"):
multi_hop_df = gr.DataFrame()
with gr.Accordion("Lighteval Preview"):
lighteval_df = gr.DataFrame()
stages_table.change(
update_dataset,
inputs=[stages_table, hf_org_dropdown, hf_dataset_name],
outputs=[ingestion_df, summarization_df, single_shot_df, multi_hop_df, lighteval_df],
)
stages_table.change(
on_generation_succsess,
inputs=stages_table,
outputs=[tabs, btn_launch_evals],
)
# TODO: this timer should only be active when the second tab is passed to active for the first time
log_timer = gr.Timer(1.0, active=True)
log_timer.tick(
MANAGERS.read_and_get_output,
inputs=session_state,
outputs=[log_output, stages_table],
)
with gr.Tab("Evaluate Models on Benchmark", id=2):
with gr.Column():
gr.Markdown("### 🧪 Evaluate models on your benchmark")
gr.Markdown(
"Runs the evaluation with [Lighteval](https://github.com/huggingface/lighteval) on the resulted dataset using 5+ open models, then deploys a leaderboard as a Hugging Face Space under your org."
)
with gr.Row():
with gr.Column():
btn_launch_evals.render()
with gr.Column():
clear_status_btn = gr.Button("Clear", variant="secondary")
with gr.Accordion("Evaluation Log", open=True):
eval_status = gr.Textbox(label="", lines=6, interactive=False, show_label=False)
btn_launch_evals.click(
run_evaluation_pipeline,
[hf_org_dropdown, hf_dataset_name, gr.State("lighteval")],
eval_status,
)
clear_status_btn.click(lambda: "", outputs=eval_status)
app.load(init_session, outputs=session_state)
app.launch(allowed_paths=[PATH])