import copy import glob import json import os # Necessary for `requests`. Without set correct path or empty string it fails during process HTTPS connection with this: [Errno 101] Network is unreachable if os.path.exists("/etc/ssl/certs/ca-certificates.crt"): os.environ["CURL_CA_BUNDLE"] = "/etc/ssl/certs/ca-certificates.crt" os.environ["REQUESTS_CA_BUNDLE"] = "/etc/ssl/certs/ca-certificates.crt" else: os.environ["CURL_CA_BUNDLE"] = "" os.environ["REQUESTS_CA_BUNDLE"] = "" print(f"{os.environ.get('CURL_CA_BUNDLE') = }") print(f"{os.environ.get('REQUESTS_CA_BUNDLE') = }") import hashlib import time import requests from collections import namedtuple from xml.sax.saxutils import escape as xmlEscape, quoteattr as xmlQuoteAttr from threading import Lock import gradio as gr import pandas as pd from huggingface_hub import HfApi, snapshot_download from compare_significance import SUPPORTED_METRICS VISIBLE_METRICS = SUPPORTED_METRICS + ["macro_f1"] api = HfApi() ORG = "CZLC" REPO = f"{ORG}/LLM_benchmark_data" HF_TOKEN = os.environ.get("HF_TOKEN") TASKS_METADATA_PATH = "./tasks_metadata.json" MARKDOWN_SPECIAL_CHARACTERS = { "#": "#", # for usage in xml.sax.saxutils.escape as entities must be first "\\": "\", "`": "`", "*": "*", "_": "_", "{": "{", "}": "}", "[": "[", "]": "]", "(": "(", ")": ")", "+": "+", "-": "-", ".": ".", "!": "!", "=": "=", "|": "|" } def xmlAndMarkdownEscape(text): return xmlEscape(text, MARKDOWN_SPECIAL_CHARACTERS) def check_significance_send_task(model_a_path, model_b_path): url = 'https://czechllm.fit.vutbr.cz/benczechmark-leaderboard/compare_significance/' # prepare and send request with ( open(model_a_path, 'rb') as model_a_fp, open(model_b_path, 'rb') as model_b_fp, ): files = { 'model_a': model_a_fp, 'model_b': model_b_fp, } response = requests.post(url, files=files, timeout=60 * 5) # check response if response.status_code == 202: result_url = response.url #task_id = response.json()['task_id'] elif response.status_code == 429: raise RuntimeError('Server is too busy. Please try again later.') # TODO: try-except do raise gr.error else: raise RuntimeError(f'Failed to submit task. Status code: {response.status_code}') # TODO: try-except do raise gr.error return result_url def check_significance_wait_for_result(result_url): while True: response = requests.get(result_url, timeout=60 * 5) if response.status_code == 200: result = response.json() break elif response.status_code == 202: time.sleep(5) else: raise RuntimeError(f'Failed to get result. Status code: {response.status_code}') # TODO: try-except do raise gr.error if result["state"] == "COMPLETED": return result['result'] else: raise RuntimeError(result['result']['error']) def check_significance(model_a_path, model_b_path): result_url = check_significance_send_task(model_a_path, model_b_path) result = check_significance_wait_for_result(result_url) return result pre_submit_lock = Lock() class _ReadLock: def __init__(self, lock): self._lock = lock self.reading = 0 def __enter__(self): with self._lock: self.reading += 1 def __exit__(self, exc_type, exc_value, traceback): with self._lock: self.reading -= 1 class ReadWriteLock: """ Zámek, který ověří, že nikdo nečte když se zapisuje a že zapisuje pouze jeden """ def __init__(self): self._lock = Lock() self.ro = _ReadLock(self._lock) self.rw = self def __enter__(self): self._lock.acquire() while True: reading = self.ro.reading if reading > 0: self._lock.release() time.sleep(1) self._lock.acquire() elif reading < 0: self._lock.release() raise RuntimeError() else: return def __exit__(self, exc_type, exc_value, traceback): self._lock.release() class LeaderboardServer: def __init__(self): self.SERVER_ADDRESS = REPO self.REPO_TYPE = "dataset" self.TASKS_METADATA = json.load(open(TASKS_METADATA_PATH)) self.TASKS_CATEGORIES = {self.TASKS_METADATA[task]["category"] for task in self.TASKS_METADATA} self.TASKS_CATEGORY_OVERALL = "Overall" self.CATEGORY_TO_TASK_ABBREVIATION_TO_DETAILS = self._prepare_category_to_task_abbr_to_details() self.var_lock = ReadWriteLock() self.submission_ids = set() self.submission_id_to_file = {} # Map submission ids to file paths self.submission_id_to_model_title = {} self.submission_id_to_data = {} # Only data (results and metadata) using by leaderboard self.tournament_results = None self.results_dataset_local_snapshot_lock = ReadWriteLock() self.results_dataset_local_snapshot = None self.pre_submit_lock = pre_submit_lock self.pre_submit = None self.update_leaderboard() self.results_dataset_integrity_check() # Check integrity of the results dataset after (re)start Hugging Face Space def update_leaderboard(self): with self.results_dataset_local_snapshot_lock.rw: self.results_dataset_local_snapshot = snapshot_download( self.SERVER_ADDRESS, repo_type=self.REPO_TYPE, token=HF_TOKEN, local_dir="./", ) self.fetch_existing_models() tournament_results = self.load_tournament_results() with self.var_lock.rw: self.tournament_results = tournament_results def load_tournament_results(self): with self.results_dataset_local_snapshot_lock.ro: metadata_rank_paths = os.path.join(self.results_dataset_local_snapshot, "tournament.json") if not os.path.exists(metadata_rank_paths): return {} with open(metadata_rank_paths) as ranks_file: results = json.load(ranks_file) return results def _prepare_category_to_task_abbr_to_details(self): tasks_per_category = {} for task in self.TASKS_METADATA: task_category = self.TASKS_METADATA[task]["category"] tasks_per_category.setdefault(task_category, list()).append(task) category2abbreviation2name = {self.TASKS_CATEGORY_OVERALL: {}} for category, tasks in tasks_per_category.items(): abbreviation2name = { self.TASKS_METADATA[t]["abbreviation"]: ( self.TASKS_METADATA[t]["abbreviation"], self.TASKS_METADATA[t]["name"], self.TASKS_METADATA[t]["source_url"], ) for t in tasks } sorted_abbreviation2name = dict.fromkeys(sorted(abbreviation2name.keys())) sorted_abbreviation2name.update(abbreviation2name) category2abbreviation2name[category] = sorted_abbreviation2name category2abbreviation2name[self.TASKS_CATEGORY_OVERALL].update(sorted_abbreviation2name) abbreviation2name = category2abbreviation2name[self.TASKS_CATEGORY_OVERALL] sorted_abbreviation2name = dict.fromkeys(sorted(abbreviation2name.keys())) sorted_abbreviation2name.update(abbreviation2name) category2abbreviation2name[self.TASKS_CATEGORY_OVERALL] = sorted_abbreviation2name return category2abbreviation2name def fetch_existing_models(self): # Models data submission_ids = set() submission_id_to_file = {} submission_id_to_model_title = {} submission_id_to_data = {} with self.results_dataset_local_snapshot_lock.ro: for submission_file in glob.glob(os.path.join(self.results_dataset_local_snapshot, "data") + "/*.json"): data = json.load(open(submission_file)) metadata = data.get('metadata') if metadata is None: continue submission_id = metadata["submission_id"] submission_ids.add(submission_id) submission_id_to_file[submission_id] = submission_file submission_id_to_model_title[submission_id] = metadata["team_name"] + "/" + metadata["model_name"] submission_id_to_data[submission_id] = {"results": data["results"], "metadata": metadata} with self.var_lock.rw: self.submission_ids = submission_ids self.submission_id_to_file = submission_id_to_file self.submission_id_to_model_title = submission_id_to_model_title self.submission_id_to_data = submission_id_to_data def results_dataset_integrity_check(self): """ Zkontroluje, že: - všechny modely byly v duelu se všemi -- pokud ne, znemožní potvrzení nových submitů a udělá zbývající zápasy -- kontroluje soubory v adresáři "/data" a soubor "tournament.json" - v souboru "tournament.json" není `submission_id`, které by nemělo soubor v adresáři "/data" """ while True: with self.pre_submit_lock: if self.pre_submit == None: gr.Info('Checking integrity...', duration=15) self.update_leaderboard() with self.var_lock.ro: # Is every `submission_id` in results known? if self.tournament_results.keys() - self.submission_ids != set(): pass # Was every `submission_id` in some match? elif self.submission_ids - self.tournament_results.keys() != set(): pass # Are all competitors known? elif any( self.tournament_results[submission_id].keys() - self.submission_ids != set() for submission_id in self.submission_ids ): pass # Has had every `submission_id` match with all competitors? elif any( self.submission_ids - self.tournament_results[submission_id].keys() != set() for submission_id in self.submission_ids ): pass else: break gr.Info('Running tournament...', duration=15) with self.var_lock.rw: self.tournament_results = {} submission_ids_backup = self.submission_ids self.submission_ids = set() for submission_id in submission_ids_backup: with self.var_lock.ro: file = self.submission_id_to_file[submission_id] tournament_results = self.start_tournament(submission_id, file) with self.var_lock.rw: self.tournament_results = tournament_results self.submission_ids.add(submission_id) gr.Info('Uploading tournament results...', duration=5) if self.tournament_results: self._upload_tournament_results(self.tournament_results) break gr.Info("Waiting in queue...", duration=5) time.sleep(10) gr.Info('Integrity of the results dataset is checked', duration=5) @staticmethod def _model_tournament_table_highlight_true_and_false(x): df_css = x.copy() for c in df_css: for i in range(len(df_css.index)): if x[c].iloc[i] == True or ">true<" in str(x[c].iloc[i]).lower(): df_css[c].iloc[i] = 'background-color: rgba(0, 255, 0, 0.1);' elif x[c].iloc[i] == False or ">false<" in str(x[c].iloc[i]).lower(): df_css[c].iloc[i] = 'background-color: rgba(255, 0, 0, 0.1);' else: df_css[c].iloc[i] = '' return df_css def get_model_tournament_table(self, submission_id, category): if category == self.TASKS_CATEGORY_OVERALL: return None model_tournament_table = [] with self.var_lock.ro: for competitor_id in self.tournament_results[submission_id].keys() - {submission_id}: # without self data = self.submission_id_to_data[competitor_id] match_results = {} for task in self.tournament_results[submission_id][competitor_id]: task_category = self.TASKS_METADATA[task]["category"] if task_category == category: match_task_result_details = dict.fromkeys(["significant", "p_value"]) # order has impact to sorting DataFrame match_task_result_details.update(copy.deepcopy(self.tournament_results[submission_id][competitor_id][task])) match_task_result_details["significant"] = str(match_task_result_details["significant"]).lower() # originaly bool match_task_result_significant = match_task_result_details["significant"] match_task_result_details = "\n".join(f"{k}: {v}" for k, v in match_task_result_details.items()) match_results[task] = f'{match_task_result_significant}' model_link = data["metadata"]["link_to_model"] model_title = data["metadata"]["team_name"] + "/" + data["metadata"]["model_name"] model_title_abbr_team_name = self.abbreviate(data["metadata"]["team_name"], 28) model_title_abbr_model_name = self.abbreviate(data["metadata"]["model_name"], 28) model_title_abbr_html = f'