import asyncio, os from yourbench_space.leaderboard_space.env import INIT_MODELS ON_SPACES=os.environ.get("system") == "spaces" OUTPUT_DIR = "/data" if ON_SPACES else "." def create_eval_file(eval_ds_name): # TODO: replace by Nathan's call content = """ from aenum import extend_enum from lighteval.metrics.metrics import Metrics from lighteval.metrics.utils.metric_utils import ( CorpusLevelMetricGrouping, MetricCategory, MetricUseCase, ) from lighteval.tasks.lighteval_task import LightevalTaskConfig from lighteval.tasks.extended.hle.main import JudgeLLMHLE from lighteval.tasks.requests import Doc def prompt_function(line, task_name: str = None): if line["image"] not in [None, ""]: return return Doc( task_name=task_name, query="Question: " + line["question"] + "\\nAnswer:", choices=[line["answer"]], gold_index=0, specific={"question": line["question"]}, ) """ + f""" hle = LightevalTaskConfig( name="{eval_ds_name.replace('/', '_')}", suite=["custom"], prompt_function=prompt_function, hf_repo="{eval_ds_name}", hf_subset="default", hf_avail_splits=["test"], evaluation_splits=["test"], few_shots_split=None, few_shots_select=None, generation_size=8192, metric=[Metrics.exact_match], stop_sequence=[], trust_dataset=True, version=0, ) TASKS_TABLE = [hle] """ with open(f"{OUTPUT_DIR}/custom_task.py", "w") as f: f.write(content) async def run_process(args: list) -> dict: process = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) await asyncio.wait_for(process.wait(), timeout=180) stdout = await process.stdout.read() stderr = await process.stderr.read() return { 'pid': process.pid, 'stdout': stdout.decode(), 'stderr': stderr.decode() } async def run_evaluations(eval_ds_name: str, org: str) -> list: tasks = [] for model_name, provider in INIT_MODELS: args = [ "lighteval", "endpoint", "inference-providers", f"model={model_name},provider={provider}", f"custom|{eval_ds_name.replace('/', '_')}|0|0", "--custom-tasks", f"{OUTPUT_DIR}/custom_task.py", "--max-samples", "10", "--output-dir", f"{OUTPUT_DIR}", "--save-details", "--results-org", org, "--push-to-hub" ] tasks.append(run_process(args)) # Will capture the task if failed processes = await asyncio.gather(*tasks, return_exceptions=True) if all(not isinstance(result, Exception) for result in processes): return "✅" return "At least one model failed"