Spaces:
Running
Running
import json | |
import logging | |
import os | |
import time | |
import pandas as pd | |
from huggingface_hub import snapshot_download | |
from src.envs import DATA_PATH, H4_TOKEN, RESULTS_REPO, METAINFO_REPO | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
def time_diff_wrapper(func): | |
def wrapper(*args, **kwargs): | |
start_time = time.time() | |
result = func(*args, **kwargs) | |
end_time = time.time() | |
diff = end_time - start_time | |
logging.info("Time taken for %s: %s seconds", func.__name__, diff) | |
return result | |
return wrapper | |
def chmod_recursive(path, mode): | |
os.chmod(path, mode) | |
for root, dirs, files in os.walk(path): | |
for dir in dirs: | |
os.chmod(os.path.join(root, dir), mode) | |
for file in files: | |
os.chmod(os.path.join(root, file), mode) | |
def download_dataset(repo_id, local_dir, repo_type="dataset", max_attempts=3, backoff_factor=1.5): | |
"""Download dataset with exponential backoff retries.""" | |
os.makedirs(local_dir, exist_ok=True) | |
os.makedirs('./tmp', exist_ok=True) | |
attempt = 0 | |
while attempt < max_attempts: | |
try: | |
logging.info("Downloading %s to %s", repo_id, local_dir) | |
snapshot_download( | |
repo_id=repo_id, | |
local_dir=local_dir, | |
cache_dir='./tmp', | |
repo_type=repo_type, | |
tqdm_class=None, | |
token=H4_TOKEN, | |
etag_timeout=30, | |
max_workers=8, | |
force_download=True, | |
local_dir_use_symlinks=False | |
) | |
logging.info("Download successful") | |
return | |
except Exception as e: | |
wait_time = backoff_factor**attempt | |
logging.error("Error downloading %s: %s, retrying in %ss", repo_id, e, wait_time) | |
time.sleep(wait_time) | |
attempt += 1 | |
logging.error("Failed to download %s after %s attempts", repo_id, max_attempts) | |
def download_openbench(): | |
""" | |
Скачивает необходимые данные для лидерборда из репозиториев HuggingFace | |
""" | |
# Скачиваем метаданные лидерборда | |
try: | |
download_dataset(METAINFO_REPO, DATA_PATH) | |
logging.info("Successfully downloaded leaderboard metainfo data") | |
except Exception as e: | |
logging.error(f"Failed to download leaderboard metainfo: {e}") | |
# Скачиваем результаты моделей | |
try: | |
download_dataset(RESULTS_REPO, "m_data") | |
logging.info("Successfully downloaded model evaluation results") | |
except Exception as e: | |
logging.error(f"Failed to download model evaluation results: {e}") | |
def build_leadearboard_df(): | |
""" | |
Функция для сбора данных лидерборда из всех доступных источников. | |
Гарантирует, что в лидерборде будет только одна запись для каждой модели (с наивысшим score). | |
""" | |
results = [] | |
best_model_results = {} # Словарь для отслеживания лучших результатов моделей | |
# 1. Пытаемся загрузить данные из метаинформации лидерборда | |
try: | |
leaderboard_path = os.path.join(DATA_PATH, "leaderboard.json") | |
if os.path.exists(leaderboard_path): | |
with open(leaderboard_path, "r", encoding="utf-8") as eval_file: | |
saved_data = json.load(eval_file) | |
if saved_data: | |
logging.info(f"Loaded {len(saved_data)} models from saved leaderboard data") | |
# Обрабатываем каждую модель, сохраняя только лучший результат | |
for item in saved_data: | |
try: | |
# Получаем имя модели, проверяя разные возможные ключи | |
model_name = item.get("model_name", item.get("model", "")) | |
if not model_name: | |
continue | |
# Стандартизируем данные | |
model_data = { | |
"model": model_name, | |
"score": float(item.get("score", 0.0)), | |
"math_score": float(item.get("math_score", 0.0)), | |
"physics_score": float(item.get("physics_score", 0.0)), | |
"total_tokens": int(item.get("total_tokens", 0)), | |
"evaluation_time": float(item.get("evaluation_time", 0.0)), | |
"system_prompt": item.get("system_prompt", "Вы - полезный помощник по математике и физике. Ответьте на русском языке.") | |
} | |
# Определяем, является ли это лучшим результатом для данной модели | |
model_base_name = model_name.split("/")[-1].split("_v")[0] | |
if model_base_name in best_model_results: | |
if model_data["score"] > best_model_results[model_base_name]["score"]: | |
best_model_results[model_base_name] = model_data | |
else: | |
best_model_results[model_base_name] = model_data | |
except KeyError as e: | |
# Логируем ошибку, но продолжаем обработку других моделей | |
logging.error(f"Failed to process model data: {e}") | |
except Exception as e: | |
logging.error(f"Failed to load saved leaderboard data: {e}") | |
# 2. Загружаем модели из директории внешних моделей | |
try: | |
external_dir = "./m_data/model_data/external/" | |
if os.path.exists(external_dir): | |
for file in os.listdir(external_dir): | |
if file.endswith(".json"): | |
try: | |
with open(os.path.join(external_dir, file), "r", encoding="utf-8") as f: | |
data = json.load(f) | |
# Конвертируем данные из любого формата в формат DeathMath | |
converted_data = convert_old_format_to_deatmath(data) | |
# Проверяем наличие необходимых полей после конвертации | |
model_name = converted_data.get("model_name", converted_data.get("model", "")) | |
if not model_name: | |
logging.error(f"Failed to parse {file}: 'model_name' not found after conversion") | |
continue | |
# Стандартизируем данные | |
model_data = { | |
"model": model_name, | |
"score": float(converted_data.get("score", 0.0)), | |
"math_score": float(converted_data.get("math_score", 0.0)), | |
"physics_score": float(converted_data.get("physics_score", 0.0)), | |
"total_tokens": int(converted_data.get("total_tokens", 0)), | |
"evaluation_time": float(converted_data.get("evaluation_time", 0.0)), | |
"system_prompt": converted_data.get("system_prompt", | |
"Вы - полезный помощник по математике и физике. Ответьте на русском языке.") | |
} | |
# Определяем, является ли это лучшим результатом для данной модели | |
model_base_name = model_name.split("/")[-1].split("_v")[0] | |
if model_base_name in best_model_results: | |
if model_data["score"] > best_model_results[model_base_name]["score"]: | |
best_model_results[model_base_name] = model_data | |
else: | |
best_model_results[model_base_name] = model_data | |
except Exception as e: | |
logging.error(f"Failed to parse {file}: {str(e)}") | |
continue | |
except Exception as e: | |
logging.error(f"Failed to process external model data: {e}") | |
# 3. Собираем все лучшие результаты | |
results = list(best_model_results.values()) | |
# 4. Добавляем базовые модели по умолчанию, если список пуст | |
if not results: | |
# Добавляем несколько моделей-заглушек для отображения интерфейса | |
results = [ | |
{ | |
"model": "example/model-1", | |
"score": 0.7, | |
"math_score": 0.8, | |
"physics_score": 0.6, | |
"total_tokens": 1000000, | |
"evaluation_time": 3600.0, | |
"system_prompt": "Вы - полезный помощник по математике и физике. Ответьте на русском языке." | |
}, | |
{ | |
"model": "example/model-2", | |
"score": 0.6, | |
"math_score": 0.7, | |
"physics_score": 0.5, | |
"total_tokens": 800000, | |
"evaluation_time": 3000.0, | |
"system_prompt": "Вы - полезный помощник по математике и физике. Ответьте на русском языке." | |
} | |
] | |
logging.warning("No model data found, using example models") | |
# Создаем DataFrame и сортируем по общему баллу | |
df = pd.DataFrame(results) | |
df.sort_values(by='score', ascending=False, inplace=True) | |
# Округляем числовые столбцы для красивого отображения | |
numeric_cols = df.select_dtypes(include=['number']).columns | |
if not numeric_cols.empty: | |
df[numeric_cols] = df[numeric_cols].round(3) | |
return df | |
def convert_old_format_to_deatmath(data): | |
""" | |
Конвертирует данные из старого формата Small Shlepa в формат DeathMath | |
Args: | |
data (dict): Данные модели в старом формате | |
Returns: | |
dict: Конвертированные данные в формате DeathMath | |
""" | |
# Проверяем, возможно это файл уже в формате DeathMath | |
if "score" in data: | |
return data | |
# Проверяем формат Small Shlepa с полями: musicmc, moviesmc, booksmc, lawmc, mmluproru | |
small_shlepa_fields = ["musicmc", "moviesmc", "booksmc", "lawmc", "mmluproru", "model"] | |
is_shlepa_format = any(field in data for field in small_shlepa_fields) | |
if is_shlepa_format: | |
logging.info(f"Конвертация модели из формата Small Shlepa в формат DeathMath: {data.get('model', 'Unknown')}") | |
# Конвертируем данные с примерным соответствием: | |
# math_score = среднее(musicmc, booksmc, mmluproru) | |
# physics_score = lawmc или moviesmc | |
math_score = 0.0 | |
math_components = 0 | |
if "musicmc" in data and data["musicmc"] is not None: | |
math_score += float(data["musicmc"]) | |
math_components += 1 | |
if "booksmc" in data and data["booksmc"] is not None: | |
math_score += float(data["booksmc"]) | |
math_components += 1 | |
if "mmluproru" in data and data["mmluproru"] is not None: | |
math_score += float(data["mmluproru"]) | |
math_components += 1 | |
if math_components > 0: | |
math_score /= math_components | |
# Для physics_score используем значение lawmc или moviesmc (что доступно) | |
physics_score = 0.0 | |
if "lawmc" in data and data["lawmc"] is not None: | |
physics_score = float(data["lawmc"]) | |
elif "moviesmc" in data and data["moviesmc"] is not None: | |
physics_score = float(data["moviesmc"]) | |
# Общий скор - среднее арифметическое | |
avg_score = (math_score + physics_score) / 2 if math_score or physics_score else 0.0 | |
converted_data = { | |
"model_name": data.get("model", "Unknown"), | |
"score": avg_score, | |
"math_score": math_score, | |
"physics_score": physics_score, | |
"total_tokens": int(data.get("total_tokens", 0)), | |
"evaluation_time": float(data.get("evaluation_time", 0.0)), | |
"system_prompt": data.get("system_prompt", | |
"Вы - полезный помощник по математике и физике. Ответьте на русском языке.") | |
} | |
return converted_data | |
# Если формат неизвестен, возвращаем стандартный шаблон | |
logging.warning(f"Неизвестный формат данных модели, использую шаблон") | |
return { | |
"model_name": data.get("model_name", data.get("model", "Unknown")), | |
"score": 0.0, | |
"math_score": 0.0, | |
"physics_score": 0.0, | |
"total_tokens": 0, | |
"evaluation_time": 0.0, | |
"system_prompt": "Вы - полезный помощник по математике и физике. Ответьте на русском языке." | |
} | |