import json import logging import os import time import pandas as pd from huggingface_hub import snapshot_download from src.envs import DATA_PATH, H4_TOKEN, RESULTS_REPO, METAINFO_REPO # Configure logging logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") def time_diff_wrapper(func): def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() diff = end_time - start_time logging.info("Time taken for %s: %s seconds", func.__name__, diff) return result return wrapper def chmod_recursive(path, mode): os.chmod(path, mode) for root, dirs, files in os.walk(path): for dir in dirs: os.chmod(os.path.join(root, dir), mode) for file in files: os.chmod(os.path.join(root, file), mode) @time_diff_wrapper def download_dataset(repo_id, local_dir, repo_type="dataset", max_attempts=3, backoff_factor=1.5): """Download dataset with exponential backoff retries.""" os.makedirs(local_dir, exist_ok=True) os.makedirs('./tmp', exist_ok=True) attempt = 0 while attempt < max_attempts: try: logging.info("Downloading %s to %s", repo_id, local_dir) snapshot_download( repo_id=repo_id, local_dir=local_dir, cache_dir='./tmp', repo_type=repo_type, tqdm_class=None, token=H4_TOKEN, etag_timeout=30, max_workers=8, force_download=True, local_dir_use_symlinks=False ) logging.info("Download successful") return except Exception as e: wait_time = backoff_factor**attempt logging.error("Error downloading %s: %s, retrying in %ss", repo_id, e, wait_time) time.sleep(wait_time) attempt += 1 logging.error("Failed to download %s after %s attempts", repo_id, max_attempts) def download_openbench(): """ Скачивает необходимые данные для лидерборда из репозиториев HuggingFace """ # Скачиваем метаданные лидерборда try: download_dataset(METAINFO_REPO, DATA_PATH) logging.info("Successfully downloaded leaderboard metainfo data") except Exception as e: logging.error(f"Failed to download leaderboard metainfo: {e}") # Скачиваем результаты моделей try: download_dataset(RESULTS_REPO, "m_data") logging.info("Successfully downloaded model evaluation results") except Exception as e: logging.error(f"Failed to download model evaluation results: {e}") def build_leadearboard_df(): """ Функция для сбора данных лидерборда из всех доступных источников. Гарантирует, что в лидерборде будет только одна запись для каждой модели (с наивысшим score). """ results = [] best_model_results = {} # Словарь для отслеживания лучших результатов моделей # 1. Пытаемся загрузить данные из метаинформации лидерборда try: leaderboard_path = os.path.join(DATA_PATH, "leaderboard.json") if os.path.exists(leaderboard_path): with open(leaderboard_path, "r", encoding="utf-8") as eval_file: saved_data = json.load(eval_file) if saved_data: logging.info(f"Loaded {len(saved_data)} models from saved leaderboard data") # Обрабатываем каждую модель, сохраняя только лучший результат for item in saved_data: try: # Получаем имя модели, проверяя разные возможные ключи model_name = item.get("model_name", item.get("model", "")) if not model_name: continue # Стандартизируем данные model_data = { "model": model_name, "score": float(item.get("score", 0.0)), "math_score": float(item.get("math_score", 0.0)), "physics_score": float(item.get("physics_score", 0.0)), "total_tokens": int(item.get("total_tokens", 0)), "evaluation_time": float(item.get("evaluation_time", 0.0)), "system_prompt": item.get("system_prompt", "Вы - полезный помощник по математике и физике. Ответьте на русском языке.") } # Определяем, является ли это лучшим результатом для данной модели model_base_name = model_name.split("/")[-1].split("_v")[0] if model_base_name in best_model_results: if model_data["score"] > best_model_results[model_base_name]["score"]: best_model_results[model_base_name] = model_data else: best_model_results[model_base_name] = model_data except KeyError as e: # Логируем ошибку, но продолжаем обработку других моделей logging.error(f"Failed to process model data: {e}") except Exception as e: logging.error(f"Failed to load saved leaderboard data: {e}") # 2. Загружаем модели из директории внешних моделей try: external_dir = "./m_data/model_data/external/" if os.path.exists(external_dir): for file in os.listdir(external_dir): if file.endswith(".json"): try: with open(os.path.join(external_dir, file), "r", encoding="utf-8") as f: data = json.load(f) # Конвертируем данные из любого формата в формат DeathMath converted_data = convert_old_format_to_deatmath(data) # Проверяем наличие необходимых полей после конвертации model_name = converted_data.get("model_name", converted_data.get("model", "")) if not model_name: logging.error(f"Failed to parse {file}: 'model_name' not found after conversion") continue # Стандартизируем данные model_data = { "model": model_name, "score": float(converted_data.get("score", 0.0)), "math_score": float(converted_data.get("math_score", 0.0)), "physics_score": float(converted_data.get("physics_score", 0.0)), "total_tokens": int(converted_data.get("total_tokens", 0)), "evaluation_time": float(converted_data.get("evaluation_time", 0.0)), "system_prompt": converted_data.get("system_prompt", "Вы - полезный помощник по математике и физике. Ответьте на русском языке.") } # Определяем, является ли это лучшим результатом для данной модели model_base_name = model_name.split("/")[-1].split("_v")[0] if model_base_name in best_model_results: if model_data["score"] > best_model_results[model_base_name]["score"]: best_model_results[model_base_name] = model_data else: best_model_results[model_base_name] = model_data except Exception as e: logging.error(f"Failed to parse {file}: {str(e)}") continue except Exception as e: logging.error(f"Failed to process external model data: {e}") # 3. Собираем все лучшие результаты results = list(best_model_results.values()) # 4. Добавляем базовые модели по умолчанию, если список пуст if not results: # Добавляем несколько моделей-заглушек для отображения интерфейса results = [ { "model": "example/model-1", "score": 0.7, "math_score": 0.8, "physics_score": 0.6, "total_tokens": 1000000, "evaluation_time": 3600.0, "system_prompt": "Вы - полезный помощник по математике и физике. Ответьте на русском языке." }, { "model": "example/model-2", "score": 0.6, "math_score": 0.7, "physics_score": 0.5, "total_tokens": 800000, "evaluation_time": 3000.0, "system_prompt": "Вы - полезный помощник по математике и физике. Ответьте на русском языке." } ] logging.warning("No model data found, using example models") # Создаем DataFrame и сортируем по общему баллу df = pd.DataFrame(results) df.sort_values(by='score', ascending=False, inplace=True) # Округляем числовые столбцы для красивого отображения numeric_cols = df.select_dtypes(include=['number']).columns if not numeric_cols.empty: df[numeric_cols] = df[numeric_cols].round(3) return df def convert_old_format_to_deatmath(data): """ Конвертирует данные из старого формата Small Shlepa в формат DeathMath Args: data (dict): Данные модели в старом формате Returns: dict: Конвертированные данные в формате DeathMath """ # Проверяем, возможно это файл уже в формате DeathMath if "score" in data: return data # Проверяем формат Small Shlepa с полями: musicmc, moviesmc, booksmc, lawmc, mmluproru small_shlepa_fields = ["musicmc", "moviesmc", "booksmc", "lawmc", "mmluproru", "model"] is_shlepa_format = any(field in data for field in small_shlepa_fields) if is_shlepa_format: logging.info(f"Конвертация модели из формата Small Shlepa в формат DeathMath: {data.get('model', 'Unknown')}") # Конвертируем данные с примерным соответствием: # math_score = среднее(musicmc, booksmc, mmluproru) # physics_score = lawmc или moviesmc math_score = 0.0 math_components = 0 if "musicmc" in data and data["musicmc"] is not None: math_score += float(data["musicmc"]) math_components += 1 if "booksmc" in data and data["booksmc"] is not None: math_score += float(data["booksmc"]) math_components += 1 if "mmluproru" in data and data["mmluproru"] is not None: math_score += float(data["mmluproru"]) math_components += 1 if math_components > 0: math_score /= math_components # Для physics_score используем значение lawmc или moviesmc (что доступно) physics_score = 0.0 if "lawmc" in data and data["lawmc"] is not None: physics_score = float(data["lawmc"]) elif "moviesmc" in data and data["moviesmc"] is not None: physics_score = float(data["moviesmc"]) # Общий скор - среднее арифметическое avg_score = (math_score + physics_score) / 2 if math_score or physics_score else 0.0 converted_data = { "model_name": data.get("model", "Unknown"), "score": avg_score, "math_score": math_score, "physics_score": physics_score, "total_tokens": int(data.get("total_tokens", 0)), "evaluation_time": float(data.get("evaluation_time", 0.0)), "system_prompt": data.get("system_prompt", "Вы - полезный помощник по математике и физике. Ответьте на русском языке.") } return converted_data # Если формат неизвестен, возвращаем стандартный шаблон logging.warning(f"Неизвестный формат данных модели, использую шаблон") return { "model_name": data.get("model_name", data.get("model", "Unknown")), "score": 0.0, "math_score": 0.0, "physics_score": 0.0, "total_tokens": 0, "evaluation_time": 0.0, "system_prompt": "Вы - полезный помощник по математике и физике. Ответьте на русском языке." }