BenCzechMark / server.py
idolezal's picture
Reduced processing in the critical section
cfb07ff
import copy
import glob
import json
import os
# Necessary for `requests`. Without set correct path or empty string it fails during process HTTPS connection with this: [Errno 101] Network is unreachable
if os.path.exists("/etc/ssl/certs/ca-certificates.crt"):
os.environ["CURL_CA_BUNDLE"] = "/etc/ssl/certs/ca-certificates.crt"
os.environ["REQUESTS_CA_BUNDLE"] = "/etc/ssl/certs/ca-certificates.crt"
else:
os.environ["CURL_CA_BUNDLE"] = ""
os.environ["REQUESTS_CA_BUNDLE"] = ""
print(f"{os.environ.get('CURL_CA_BUNDLE') = }")
print(f"{os.environ.get('REQUESTS_CA_BUNDLE') = }")
import hashlib
import time
from datetime import datetime, timezone
import requests
from collections import namedtuple
from xml.sax.saxutils import escape as xmlEscape, quoteattr as xmlQuoteAttr
from threading import Lock
import regex as re
import gradio as gr
import pandas as pd
from huggingface_hub import HfApi, snapshot_download
from compare_significance import SUPPORTED_METRICS
VISIBLE_METRICS = SUPPORTED_METRICS + ["macro_f1"]
api = HfApi()
HF_TOKEN = os.environ["HF_TOKEN"]
HF_RESULTS_DATASET = os.environ["HF_RESULTS_DATASET"] # <HF_RESULTS_DATASET> ::= <owner> "/" <dataset name>; e.g. CZLC/LLM_benchmark_data
# For testing purpose
HF_FAKE_TOURNAMENT = bool(int(os.environ.get("HF_FAKE_TOURNAMENT", "0")))
TASKS_METADATA_PATH = "./tasks_metadata.json"
MARKDOWN_SPECIAL_CHARACTERS = {
"#": "&#35;", # for usage in xml.sax.saxutils.escape as entities must be first
"\\": "&#92;",
"`": "&#96;",
"*": "&#42;",
"_": "&#95;",
"{": "&#123;",
"}": "&#125;",
"[": "&#91;",
"]": "&#93;",
"(": "&#40;",
")": "&#41;",
"+": "&#43;",
"-": "&#45;",
".": "&#46;",
"!": "&#33;",
"=": "&#61;",
"|": "&#124;"
}
def uniqifyList(seq, order_preserving=True):
if order_preserving:
seen = set()
return [x for x in seq if x not in seen and not seen.add(x)]
else:
return list(set(seq))
def xmlAndMarkdownEscape(text):
return xmlEscape(text, MARKDOWN_SPECIAL_CHARACTERS)
class CheckSignificanceError(Exception):
pass
def check_significance_is_reachable():
result_url = 'https://czechllm.fit.vutbr.cz/benczechmark-leaderboard/compare_significance/results/test'
try:
check_significance_wait_for_result(result_url)
except:
return False
return True
REGEX_CONNECT_TIMEOUT_ERROR = re.compile(r"""ConnectTimeoutError\(.*'(.*timed out.*)'""")
def get_timeout_error_msg(exception):
e = exception
if isinstance(e, requests.exceptions.ConnectTimeout):
error_msg = REGEX_CONNECT_TIMEOUT_ERROR.search(str(e))
if error_msg:
error_msg = error_msg.group(1)
else:
error_msg = str(e).rsplit(":", 1)[-1].strip()
else:
error_msg = str(e).rsplit(":", 1)[-1].strip()
return error_msg
def check_significance_repeat_on_conn_timeout(repeat, fn, *args, **kwargs):
while True:
try:
result = fn(*args, **kwargs)
except requests.exceptions.Timeout as e:
error_msg = get_timeout_error_msg(e)
if repeat:
print(error_msg, f"({repeat = })")
if isinstance(repeat, int):
repeat -= 1
continue
else:
raise CheckSignificanceError(error_msg)
else:
return result
def check_significance_send_task(model_a_path, model_b_path, repeat_on_conn_timeout=10):
url = 'https://czechllm.fit.vutbr.cz/benczechmark-leaderboard/compare_significance/'
# prepare and send request
with (
open(model_a_path, 'rb') as model_a_fp,
open(model_b_path, 'rb') as model_b_fp,
):
files = {
'model_a': model_a_fp,
'model_b': model_b_fp,
}
response = check_significance_repeat_on_conn_timeout(
repeat_on_conn_timeout,
requests.post, url, files=files, timeout=60 * 5
)
# check response
if response.status_code == 202:
result_url = response.url
#task_id = response.json()['task_id']
elif response.status_code == 429:
raise CheckSignificanceError('Server is too busy. Please try again later.')
else:
raise CheckSignificanceError(f'Failed to submit task. Status code: {response.status_code}')
return result_url
def check_significance_wait_for_result(result_url, repeat_on_conn_timeout=10):
while True:
response = check_significance_repeat_on_conn_timeout(
repeat_on_conn_timeout,
requests.get, result_url, timeout=60 * 5
)
if response.status_code == 200:
result = response.json()
break
elif response.status_code == 202:
time.sleep(5)
else:
raise CheckSignificanceError(f'Failed to get result. Status code: {response.status_code}')
if result["state"] == "COMPLETED":
return result['result']
else:
raise CheckSignificanceError(result['result']['error'])
def check_significance(model_a_path, model_b_path):
result_url = check_significance_send_task(model_a_path, model_b_path)
result = check_significance_wait_for_result(result_url)
return result
class NoneLock:
def __init__(self, *args, **kwargs):
pass
def __enter__(self):
return True
def __exit__(self, exc_type, exc_val, exc_tb):
return
def __call__(self, *args, **kwargs):
return NoneLock(*args, **kwargs)
class TimeoutLock:
def __init__(self, lock=None, timeout=-1):
self.lock = lock or Lock()
self.timeout = timeout
self._lock_acquired = False
def __enter__(self):
acquired = self.lock.acquire(timeout=self.timeout)
if acquired:
self._lock_acquired = True
return acquired
def __exit__(self, exc_type, exc_val, exc_tb):
if self._lock_acquired:
self.lock.release()
self._lock_acquired = False
def __call__(self, timeout):
return TimeoutLock(lock=self.lock, timeout=timeout)
class _ReadLock:
def __init__(self, lock):
self._lock = lock
self.reading = 0
def __enter__(self):
with self._lock:
self.reading += 1
def __exit__(self, exc_type, exc_value, traceback):
with self._lock:
self.reading -= 1
class ReadWriteLock:
"""
Zámek, který ověří, že nikdo nečte když se zapisuje a že zapisuje pouze jeden
"""
def __init__(self):
self._lock = Lock()
self.ro = _ReadLock(self._lock)
self.rw = self
def __enter__(self):
self._lock.acquire()
while True:
reading = self.ro.reading
if reading > 0:
self._lock.release()
time.sleep(1)
self._lock.acquire()
elif reading < 0:
self._lock.release()
raise RuntimeError()
else:
return
def __exit__(self, exc_type, exc_value, traceback):
self._lock.release()
class LeaderboardServer:
def __init__(self):
self.SERVER_ADDRESS = HF_RESULTS_DATASET
self.REPO_TYPE = "dataset"
self.TASKS_METADATA = json.load(open(TASKS_METADATA_PATH))
self.TASKS_CATEGORIES = {self.TASKS_METADATA[task]["category"] for task in self.TASKS_METADATA}
self.TASKS_CATEGORY_OVERALL = "Overall"
self.TASKS_CATEGORY_OVERALL_DETAILS = "Overall with details"
self.CATEGORY_TO_TASK_ABBREVIATION_TO_DETAILS = self._prepare_category_to_task_abbr_to_details()
self.MAX_LENGTH_OF_MODEL_TITLE = 28
self.DIR_DATAFRAMES_CSV = "./dataframes_csv"
self.var_lock = ReadWriteLock()
self.submission_ids = set()
self.submission_id_to_file = {} # Map submission ids to file paths
self.submission_id_to_model_title = {}
self.submission_id_to_data = {} # Only data (results and metadata) using by leaderboard
self.tournament_results = None
self.tournament_results_corrupted = False
self.tournament_results_integrity_solving = False
self.tournament_results_integrity_solving_progress = 0
self.leaderboard_dataframes = {} # For each category
self.tournament_dataframes = {} # For each submission_id and category
self.leaderboard_dataframes_csv = {} # For each category
self.tournament_dataframes_csv = {} # For each submission_id and category
self.results_dataset_local_snapshot_lock = ReadWriteLock()
self.results_dataset_local_snapshot = None
self.pre_submit = {}
self.submit_lock = TimeoutLock()
self.results_dataset_integrity_check() # Check integrity of the results dataset after (re)start Hugging Face Space
self.update_leaderboard()
def _update_models_and_tournament_results(self):
with self.results_dataset_local_snapshot_lock.rw:
self.results_dataset_local_snapshot = snapshot_download(
self.SERVER_ADDRESS,
repo_type=self.REPO_TYPE,
token=HF_TOKEN,
local_dir="./",
)
self.fetch_existing_models()
tournament_results = self.load_tournament_results()
with self.var_lock.rw:
self.tournament_results = tournament_results
def update_leaderboard(self):
self._update_models_and_tournament_results()
categories = [self.TASKS_CATEGORY_OVERALL, self.TASKS_CATEGORY_OVERALL_DETAILS] + sorted(self.TASKS_CATEGORIES)
leaderboard_dataframes = {
category: self._get_leaderboard(category=category) if not self.tournament_results_corrupted else pd.DataFrame(columns=['Corrupted, please check integrity'])
for category in categories
}
with self.var_lock.ro:
submission_ids = self.submission_ids
tournament_dataframes = {
submission_id: {
category: self._get_model_tournament_table(submission_id, category) if not self.tournament_results_corrupted else pd.DataFrame(columns=['Corrupted, please check integrity'])
for category in categories
}
for submission_id in submission_ids
}
with self.var_lock.rw:
self.leaderboard_dataframes = leaderboard_dataframes
self.tournament_dataframes = tournament_dataframes
leaderboard_dataframes_csv = {
category: self._dataframe_to_csv(
self._get_leaderboard(category=category, to_csv=True) if not self.tournament_results_corrupted else pd.DataFrame(columns=['Corrupted, please check integrity']),
f"Leaderboard - {category}.csv"
)
for category in categories
}
with self.var_lock.ro:
tournament_dataframes_csv = {
submission_id: {
category: self._dataframe_to_csv(
self._get_model_tournament_table(submission_id, category, to_csv=True) if not self.tournament_results_corrupted else pd.DataFrame(columns=['Corrupted, please check integrity']),
f"Tournament table - {self.submission_id_to_data[submission_id]['submission_metadata']['model_name'][:self.MAX_LENGTH_OF_MODEL_TITLE].replace('/', '_')} - {category}.csv",
)
for category in categories
}
for submission_id in submission_ids
}
with self.var_lock.rw:
self.leaderboard_dataframes_csv = leaderboard_dataframes_csv
self.tournament_dataframes_csv = tournament_dataframes_csv
def load_tournament_results(self):
with self.results_dataset_local_snapshot_lock.ro:
metadata_rank_paths = os.path.join(self.results_dataset_local_snapshot, "tournament.json")
if not os.path.exists(metadata_rank_paths):
return {}
with open(metadata_rank_paths) as ranks_file:
results = json.load(ranks_file)
return results
def _prepare_category_to_task_abbr_to_details(self):
tasks_per_category = {}
for task in self.TASKS_METADATA:
task_category = self.TASKS_METADATA[task]["category"]
tasks_per_category.setdefault(task_category, list()).append(task)
category2abbreviation2name = {self.TASKS_CATEGORY_OVERALL: {}}
for category, tasks in tasks_per_category.items():
abbreviation2name = {
self.TASKS_METADATA[t]["abbreviation"]: (
self.TASKS_METADATA[t]["abbreviation"],
self.TASKS_METADATA[t]["name"],
self.TASKS_METADATA[t]["source_url"],
)
for t in tasks
}
sorted_abbreviation2name = dict.fromkeys(sorted(abbreviation2name.keys()))
sorted_abbreviation2name.update(abbreviation2name)
category2abbreviation2name[category] = sorted_abbreviation2name
category2abbreviation2name[self.TASKS_CATEGORY_OVERALL].update(sorted_abbreviation2name)
abbreviation2name = category2abbreviation2name[self.TASKS_CATEGORY_OVERALL]
sorted_abbreviation2name = dict.fromkeys(sorted(abbreviation2name.keys()))
sorted_abbreviation2name.update(abbreviation2name)
category2abbreviation2name[self.TASKS_CATEGORY_OVERALL] = sorted_abbreviation2name
category2abbreviation2name[self.TASKS_CATEGORY_OVERALL_DETAILS] = sorted_abbreviation2name
return category2abbreviation2name
def fetch_existing_models(self):
# Models data
submission_ids = set()
submission_id_to_file = {}
submission_id_to_model_title = {}
submission_id_to_data = {}
with self.results_dataset_local_snapshot_lock.ro:
for submission_file in glob.glob(os.path.join(self.results_dataset_local_snapshot, "data") + "/*.json"):
data = json.load(open(submission_file))
metadata = data.get("submission_metadata")
if metadata is None:
continue
submission_id = metadata["submission_id"]
submission_ids.add(submission_id)
submission_id_to_file[submission_id] = submission_file
submission_id_to_model_title[submission_id] = metadata["team_name"] + "/" + metadata["model_name"]
submission_id_to_data[submission_id] = {
"results": data["results"],
"metadata": data.get("metadata", {}),
"submission_metadata": metadata,
}
with self.var_lock.rw:
self.submission_ids = submission_ids
self.submission_id_to_file = submission_id_to_file
self.submission_id_to_model_title = submission_id_to_model_title
self.submission_id_to_data = submission_id_to_data
def results_dataset_integrity_check(self, solve=False):
"""
Zkontroluje, že:
- všechny modely byly v duelu se všemi
-- pokud ne, znemožní potvrzení nových submitů a udělá zbývající zápasy
-- kontroluje soubory v adresáři "/data" a soubor "tournament.json"
- v souboru "tournament.json" není `submission_id`, které by nemělo soubor v adresáři "/data"
- negeneruje soubor "tournament.json" celý znovu, ale pouze dopočítá co chybí
"""
while True:
with self.submit_lock(timeout=5) as acquired:
if acquired:
gr.Info('Checking integrity...', duration=15)
self._update_models_and_tournament_results()
with self.var_lock.ro:
# Is every `submission_id` in results known?
if self.tournament_results.keys() - self.submission_ids != set():
pass
# Was every `submission_id` in some match?
elif self.submission_ids - self.tournament_results.keys() != set():
pass
# Are all competitors known?
elif any(
self.tournament_results[submission_id].keys() - self.submission_ids != set()
for submission_id in self.submission_ids
):
pass
# Has had every `submission_id` match with all competitors?
elif any(
self.submission_ids - self.tournament_results[submission_id].keys() != set()
for submission_id in self.submission_ids
):
pass
else:
self.tournament_results_corrupted = False
break
if solve:
self.tournament_results_integrity_solving = True
self.tournament_results_integrity_solving_progress = 0
renew_tournament_began_datetime = datetime.now(timezone.utc)
datetime2str = lambda d: d.strftime("%Y-%m-%dT%H:%M:%S %Z")
print(f"Renew tournament began at {datetime2str(renew_tournament_began_datetime)}")
gr.Info('Running tournament...', duration=15)
with self.var_lock.rw:
submission_ids_for_renew_tournament = set()
submission_ids_not_known = self.tournament_results.keys() - self.submission_ids
submission_ids_not_in_tournament = self.submission_ids - self.tournament_results.keys()
submission_ids_for_renew_tournament |= submission_ids_not_in_tournament
for submission_id in submission_ids_not_known:
self.tournament_results.pop(submission_id)
for submission_id in self.submission_ids:
competitor_ids_not_known = self.tournament_results[submission_id].keys() - self.submission_ids
competitor_ids_not_in_tournament = self.submission_ids - self.tournament_results[submission_id].keys()
for competitor_id in competitor_ids_not_known:
self.tournament_results[submission_id].pop(competitor_id)
if competitor_ids_not_in_tournament:
submission_ids_for_renew_tournament.add(submission_id)
for i, submission_id in enumerate(submission_ids_for_renew_tournament):
self.tournament_results_integrity_solving_progress = i / len(submission_ids_for_renew_tournament)
with self.var_lock.ro:
file = self.submission_id_to_file[submission_id]
tournament_results = self.start_tournament(submission_id, file)
with self.var_lock.rw:
self.tournament_results = tournament_results
self.tournament_results_integrity_solving_progress = 1
renew_tournament_ended_datetime = datetime.now(timezone.utc)
print(f"Renew tournament ended at {datetime2str(renew_tournament_ended_datetime)}")
renew_tournament_ended_time_elapsed = renew_tournament_ended_datetime - renew_tournament_began_datetime
print(f"Time elapsed: {renew_tournament_ended_time_elapsed}")
gr.Info('Uploading tournament results...', duration=5)
if self.tournament_results:
self._upload_tournament_results(self.tournament_results)
self.tournament_results_integrity_solving = False
self.tournament_results_corrupted = False
else:
self.tournament_results_corrupted = True
break
gr.Info("Waiting in queue...", duration=5)
time.sleep(10)
gr.Info('Integrity of the results dataset is checked', duration=5)
@staticmethod
def _model_tournament_table_highlight_true_and_false(x):
df_css = x.copy()
for c in df_css:
for i in range(len(df_css.index)):
if x.loc[i, c] == True or ">true<" in str(x.loc[i, c]).lower():
df_css.loc[i, c] = 'background-color: rgba(0, 255, 0, 0.1);'
elif x.loc[i, c] == False or ">false<" in str(x.loc[i, c]).lower():
df_css.loc[i, c] = 'background-color: rgba(255, 0, 0, 0.1);'
else:
df_css.loc[i, c] = ''
return df_css
def get_model_tournament_table_csv(self, submission_id, category, pre_submit=None):
if pre_submit == None:
with self.var_lock.ro:
return self.tournament_dataframes_csv[submission_id][category]
else:
return self._dataframe_to_csv(
self._get_model_tournament_table(submission_id, category, pre_submit=pre_submit, to_csv=True),
f"Tournament table - pre-submit - {category}.csv",
)
def get_model_tournament_table(self, submission_id, category, pre_submit=None):
if pre_submit == None:
with self.var_lock.ro:
return copy.copy(self.tournament_dataframes[submission_id][category])
else:
return self._get_model_tournament_table(submission_id, category, pre_submit=pre_submit)
def _get_model_tournament_table(self, submission_id, category, pre_submit=None, to_csv=False):
model_tournament_table = []
with self.var_lock.ro:
tournament_results = pre_submit.tournament_results if pre_submit else self.tournament_results
for competitor_id in tournament_results[submission_id].keys() - {submission_id}: # without self
if competitor_id not in self.submission_id_to_data:
if pre_submit and competitor_id == pre_submit.submission_id:
data = pre_submit.data
else:
raise gr.Error(f"Internal error: Submission [{competitor_id}] not found")
else:
data = self.submission_id_to_data[competitor_id]
match_results = {}
for task in self.TASKS_METADATA:
task_category = self.TASKS_METADATA[task]["category"]
if category in (task_category, self.TASKS_CATEGORY_OVERALL, self.TASKS_CATEGORY_OVERALL_DETAILS):
if to_csv:
match_results[task] = tournament_results[submission_id][competitor_id][task]["significant"]
else:
match_task_result_details = dict.fromkeys(["significant", "p_value"]) # order has impact to sorting DataFrame
match_task_result_details.update(copy.deepcopy(tournament_results[submission_id][competitor_id][task]))
match_task_result_details["significant"] = str(match_task_result_details["significant"]).lower() # originaly bool
match_task_result_significant = match_task_result_details["significant"]
match_task_result_details = "\n".join(f"{k}: {v}" for k, v in match_task_result_details.items())
match_results[task] = f'<abbr title={xmlQuoteAttr(match_task_result_details)}>{match_task_result_significant}</abbr>'
model_link = data["submission_metadata"]["link_to_model"]
model_title = data["submission_metadata"]["team_name"] + "/" + data["submission_metadata"]["model_name"]
if to_csv:
match_results["model"] = model_title
match_results["link_to_model"] = model_link
else:
model_title_abbr_team_name = self.abbreviate(data["submission_metadata"]["team_name"], self.MAX_LENGTH_OF_MODEL_TITLE)
model_title_abbr_model_name = self.abbreviate(data["submission_metadata"]["model_name"], self.MAX_LENGTH_OF_MODEL_TITLE)
model_title_abbr_html = f'<div style="font-size: 10px;">{xmlAndMarkdownEscape(model_title_abbr_team_name)}</div>{xmlAndMarkdownEscape(model_title_abbr_model_name)}'
match_results["model"] = f'<a href={xmlQuoteAttr(model_link)} title={xmlQuoteAttr(model_title)}>{model_title_abbr_html}</a>'
model_tournament_table.append(match_results)
dataframe = pd.DataFrame.from_records(model_tournament_table)
extra_attributes_map_word_to_header = {
"model": "Competitor",
"link_to_model": "Link to model",
}
first_attributes = [
"model",
"link_to_model",
]
df_order = [
key
for key in dict.fromkeys(
first_attributes
+ sorted(
list(self.TASKS_METADATA.keys())
+ list(dataframe.columns)
)
).keys()
if key in dataframe.columns
]
dataframe = dataframe[df_order]
attributes_map_word_to_header = {key: value["abbreviation"] for key, value in self.TASKS_METADATA.items()}
attributes_map_word_to_header.update(extra_attributes_map_word_to_header)
dataframe = dataframe.rename(
columns=attributes_map_word_to_header
)
if not to_csv:
dataframe = dataframe.style.apply(self._model_tournament_table_highlight_true_and_false, axis=None)
return dataframe
def _dataframe_to_csv(self, dataframe, filename):
try:
if not os.path.isdir(self.DIR_DATAFRAMES_CSV):
os.mkdir(self.DIR_DATAFRAMES_CSV)
except FileExistsError:
pass
filepath = os.path.join(self.DIR_DATAFRAMES_CSV, filename)
dataframe.to_csv(filepath, index=False)
return filepath
def get_leaderboard_csv(self, pre_submit=None, category=None):
if pre_submit == None:
category = category if category else self.TASKS_CATEGORY_OVERALL
with self.var_lock.ro:
return self.leaderboard_dataframes_csv[category]
else:
return self._dataframe_to_csv(
self._get_leaderboard(pre_submit=pre_submit, category=category, to_csv=True),
f"Leaderboard - pre-submit - {category}.csv",
)
def get_leaderboard(self, pre_submit=None, category=None):
if pre_submit == None:
category = category if category else self.TASKS_CATEGORY_OVERALL
with self.var_lock.ro:
return copy.copy(self.leaderboard_dataframes[category])
else:
return self._get_leaderboard(pre_submit=pre_submit, category=category)
def _get_leaderboard(self, pre_submit=None, category=None, to_csv=False):
with self.var_lock.ro:
tournament_results = pre_submit.tournament_results if pre_submit else self.tournament_results
category = category if category else self.TASKS_CATEGORY_OVERALL
if len(tournament_results) == 0:
return pd.DataFrame(columns=['No submissions yet'])
else:
processed_results = []
for submission_id in tournament_results.keys():
if submission_id not in self.submission_id_to_data:
if pre_submit and submission_id == pre_submit.submission_id:
data = pre_submit.data
else:
raise gr.Error(f"Internal error: Submission [{submission_id}] not found")
else:
data = self.submission_id_to_data[submission_id]
if submission_id != data["submission_metadata"]["submission_id"]:
raise gr.Error(f"Proper submission [{submission_id}] not found")
local_results = {}
win_score = {}
visible_metrics_map_word_to_header = {}
for task in self.TASKS_METADATA.keys():
task_category = self.TASKS_METADATA[task]["category"]
if category not in (self.TASKS_CATEGORY_OVERALL, self.TASKS_CATEGORY_OVERALL_DETAILS, task_category):
continue
else:
# tournament_results
num_of_competitors = 0
num_of_wins = 0
for competitor_id in tournament_results[submission_id].keys() - {submission_id}: # without self
num_of_competitors += 1
if tournament_results[submission_id][competitor_id][task]["significant"]:
num_of_wins += 1
task_score = num_of_wins / num_of_competitors * 100 if num_of_competitors > 0 else 100
win_score.setdefault(task_category, []).append(task_score)
if category in (task_category, self.TASKS_CATEGORY_OVERALL_DETAILS):
local_results[task] = task_score
for metric in uniqifyList([self.TASKS_METADATA[task]["metric"]] + VISIBLE_METRICS):
visible_metrics_map_word_to_header[task + "_" + metric] = self.TASKS_METADATA[task]["abbreviation"] + " " + metric
metric_value = data['results'][task].get(metric)
if metric_value is not None:
local_results[task + "_" + metric] = metric_value if metric == "word_perplexity" else metric_value * 100
break # Only the first metric of every task
for c in win_score:
win_score[c] = sum(win_score[c]) / len(win_score[c])
if category in (self.TASKS_CATEGORY_OVERALL, self.TASKS_CATEGORY_OVERALL_DETAILS):
if category == self.TASKS_CATEGORY_OVERALL:
for c in win_score:
local_results[c] = win_score[c]
local_results["average_score"] = sum(win_score.values()) / len(win_score)
else:
local_results["average_score"] = win_score[category]
model_link = data["submission_metadata"]["link_to_model"]
model_title = data["submission_metadata"]["team_name"] + "/" + data["submission_metadata"]["model_name"]
if to_csv:
local_results["model"] = model_title
local_results["link_to_model"] = model_link
else:
model_title_abbr_team_name = self.abbreviate(data["submission_metadata"]["team_name"], self.MAX_LENGTH_OF_MODEL_TITLE)
model_title_abbr_model_name = self.abbreviate(data["submission_metadata"]["model_name"], self.MAX_LENGTH_OF_MODEL_TITLE)
model_title_abbr_html = f'<div style="font-size: 10px;">{xmlAndMarkdownEscape(model_title_abbr_team_name)}</div>{xmlAndMarkdownEscape(model_title_abbr_model_name)}'
local_results["model"] = f'<a href={xmlQuoteAttr(model_link)} title={xmlQuoteAttr(model_title)}>{model_title_abbr_html}</a>'
if to_csv:
n_shot = data["metadata"].get("n-shot", "")
local_results["n-shot"] = n_shot
release = data["submission_metadata"].get("submission_timestamp")
release = time.strftime("%Y-%m-%d", time.gmtime(release)) if release else "N/A"
local_results["release"] = release
local_results["model_type"] = data["submission_metadata"]["model_type"]
local_results["parameters"] = data["submission_metadata"]["parameters"]
if pre_submit and submission_id == pre_submit.submission_id:
processed_results.insert(0, local_results)
else:
processed_results.append(local_results)
dataframe = pd.DataFrame.from_records(processed_results)
extra_attributes_map_word_to_header = {
"model": "Model",
"release": "Release",
"average_score": "Average ⬆️",
"team_name": "Team name",
"model_name": "Model name",
"model_type": "Type",
"parameters": "# θ (B)",
"input_length": "Input length (# tokens)",
"precision": "Precision",
"description": "Description",
"link_to_model": "Link to model",
}
first_attributes = [
"model",
"link_to_model",
"release",
"model_type",
"parameters",
"n-shot",
"average_score",
]
df_order = [
key
for key in dict.fromkeys(
first_attributes
+ sorted(
list(self.TASKS_METADATA.keys())
+ list(dataframe.columns)
)
).keys()
if key in dataframe.columns
]
# Sort columns
dataframe = dataframe[df_order]
# Sort rows
if pre_submit:
first_row_with_pre_submit = dataframe.iloc[0]
dataframe = dataframe.iloc[1:].sort_values(by=["average_score"], ascending=False)
dataframe = pd.concat([first_row_with_pre_submit.to_frame().T, dataframe])
else:
dataframe = dataframe.sort_values(by=["average_score"], ascending=False)
# Rename columns
attributes_map_word_to_header = {key: value["abbreviation"] for key, value in self.TASKS_METADATA.items()}
attributes_map_word_to_header.update(extra_attributes_map_word_to_header)
attributes_map_word_to_header.update(visible_metrics_map_word_to_header)
dataframe = dataframe.rename(
columns=attributes_map_word_to_header
)
return dataframe
def fake_tournament(self, new_submission_id, new_model_file):
DRAW_MATCH = {
task: {
"significant": False,
"p_value": 0.5,
"delta": 0.0,
"fake": True,
}
for task in self.TASKS_METADATA.keys()
}
with self.var_lock.ro:
new_tournament = copy.deepcopy(self.tournament_results)
pre_submit = self.pre_submit.get(new_submission_id)
if pre_submit:
new_tournament[new_submission_id] = pre_submit.tournament_results[new_submission_id]
for competitor_id in pre_submit.tournament_results[new_submission_id].keys() - {new_submission_id}:
new_tournament[competitor_id][new_submission_id] = pre_submit.tournament_results[competitor_id][new_submission_id]
if new_submission_id not in new_tournament:
new_tournament[new_submission_id] = {}
new_tournament[new_submission_id][new_submission_id] = copy.deepcopy(DRAW_MATCH)
competitor_ids_in_tournament = new_tournament[new_submission_id].keys()
rest_of_competitors = list(self.submission_ids - {new_submission_id} - competitor_ids_in_tournament) # without self and without the opponents with which it has already contended
for competitor_id in rest_of_competitors:
new_tournament[new_submission_id][competitor_id] = copy.deepcopy(DRAW_MATCH)
new_tournament[competitor_id][new_submission_id] = copy.deepcopy(DRAW_MATCH)
return new_tournament
def start_tournament(self, new_submission_id, new_model_file):
with self.var_lock.ro:
new_tournament = copy.deepcopy(self.tournament_results)
pre_submit = self.pre_submit.get(new_submission_id)
if pre_submit:
new_tournament[new_submission_id] = pre_submit.tournament_results[new_submission_id]
for competitor_id in pre_submit.tournament_results[new_submission_id].keys() - {new_submission_id}:
new_tournament[competitor_id][new_submission_id] = pre_submit.tournament_results[competitor_id][new_submission_id]
if new_submission_id not in new_tournament:
new_tournament[new_submission_id] = {}
new_tournament[new_submission_id][new_submission_id] = {
task: {
"significant": False,
"p_value": 0.5,
"delta": 0.0,
}
for task in self.TASKS_METADATA.keys()
}
competitor_ids_in_tournament = new_tournament[new_submission_id].keys()
rest_of_competitors = list(self.submission_ids - {new_submission_id} - competitor_ids_in_tournament) # without self and without the opponents with which it has already contended
num_of_competitors = len(rest_of_competitors)
result_url = {}
result_inverse_url = {}
while rest_of_competitors:
next_competitors = []
while rest_of_competitors:
if len(next_competitors) < 5: # 5*2==10 tasks
next_competitors.append(rest_of_competitors.pop())
else:
break
for competitor_id in next_competitors:
result_url[competitor_id] = check_significance_send_task(new_model_file, self.submission_id_to_file[competitor_id])
result_inverse_url[competitor_id] = check_significance_send_task(self.submission_id_to_file[competitor_id], new_model_file)
while next_competitors:
competitor_id = next_competitors.pop(0)
result = check_significance_wait_for_result(result_url.pop(competitor_id))
result_inverse = check_significance_wait_for_result(result_inverse_url.pop(competitor_id))
if rest_of_competitors:
new_competitor_id = rest_of_competitors.pop()
next_competitors.append(new_competitor_id)
result_url[new_competitor_id] = check_significance_send_task(new_model_file, self.submission_id_to_file[new_competitor_id])
result_inverse_url[new_competitor_id] = check_significance_send_task(self.submission_id_to_file[new_competitor_id], new_model_file)
new_tournament[new_submission_id][competitor_id] = result
new_tournament[competitor_id][new_submission_id] = result_inverse
num_of_competitors_done = num_of_competitors - len(next_competitors) - len(rest_of_competitors)
gr.Info(f"Tournament: {num_of_competitors_done}/{num_of_competitors} = {(num_of_competitors_done) * 100 // num_of_competitors}% done")
return new_tournament
@staticmethod
def abbreviate(s, max_length, dots_place="center"):
if len(s) <= max_length:
return s
else:
if max_length <= 1:
return "…"
elif dots_place == "begin":
return "…" + s[-max_length + 1:].lstrip()
elif dots_place == "center" and max_length >= 3:
max_length_begin = max_length // 2
max_length_end = max_length - max_length_begin - 1
return s[:max_length_begin].rstrip() + "…" + s[-max_length_end:].lstrip()
else: # dots_place == "end"
return s[:max_length - 1].rstrip() + "…"
@staticmethod
def create_submission_id(metadata):
# Délka ID musí být omezena, protože se používá v názvu souboru
submission_id = "_".join([metadata[key][:7] for key in (
"team_name",
"model_name",
"model_predictions_sha256",
"model_results_sha256",
)])
submission_id = submission_id.replace("/", "_").replace("\n", "_").strip()
return submission_id
@staticmethod
def get_sha256_hexdigest(obj):
data = json.dumps(
obj,
separators=(',', ':'),
sort_keys=True,
ensure_ascii=True,
).encode()
result = hashlib.sha256(data).hexdigest()
return result
PreSubmit = namedtuple('PreSubmit', 'tournament_results, submission_id, file, data')
def prepare_model_for_submission(self, file, metadata) -> PreSubmit:
with open(file, "r") as f:
data = json.load(f)
data["submission_metadata"] = metadata
metadata["model_predictions_sha256"] = self.get_sha256_hexdigest(data["predictions"])
metadata["model_results_sha256"] = self.get_sha256_hexdigest(data["results"])
submission_id = self.create_submission_id(metadata)
metadata["submission_id"] = submission_id
metadata["submission_timestamp"] = time.time() # timestamp
with open(file, "w") as f:
json.dump(data, f, separators=(',', ':')) # compact JSON
return self._prepare_model_for_submission(file, data=data, do_submit=False)
def save_model_submission(self, file, data=None) -> PreSubmit:
return self._prepare_model_for_submission(file, data=data, do_submit=True)
def _prepare_model_for_submission(self, file, data=None, do_submit=False) -> PreSubmit:
with open(file, "r") as f:
if not data:
data = json.load(f)
submission_id = data["submission_metadata"]["submission_id"]
while True:
submit_lock = self.submit_lock if do_submit else NoneLock()
with submit_lock(timeout=5) as acquired:
if acquired:
info_msg = 'Running tournament...'
gr.Info(info_msg, duration=40)
if do_submit:
print(f"Locked `submit_lock` for {submission_id = }")
print(info_msg)
self.update_leaderboard()
if HF_FAKE_TOURNAMENT:
tournament_results = self.fake_tournament(submission_id, file)
else:
tournament_results = self.start_tournament(submission_id, file)
pre_submit = self.PreSubmit(
tournament_results,
submission_id,
file,
{
"results": data["results"],
"metadata": data.get("metadata", {}),
"submission_metadata": data["submission_metadata"],
}
)
self.pre_submit[submission_id] = pre_submit
info_msg = 'Tournament finished!'
gr.Info(info_msg, duration=2)
if do_submit:
print(info_msg)
gr.Info("Uploading…", duration=40)
self._upload_submission(pre_submit.submission_id, pre_submit.file)
self._upload_tournament_results(pre_submit.tournament_results)
self.update_leaderboard()
self._upload_submission_id_to_model_title() # need to be after update_leaderboard()
print(f"Unlocked `submit_lock` for {submission_id = }")
break
gr.Info("Waiting in queue...", duration=5)
time.sleep(10)
return pre_submit
def _upload_submission_id_to_model_title(self):
# Temporary save tournament results
with self.results_dataset_local_snapshot_lock.rw:
submission_id_to_model_title_path = os.path.join(self.results_dataset_local_snapshot, "submission_id_to_model_title.json")
with open(submission_id_to_model_title_path, "w") as f:
json.dump(self.submission_id_to_model_title, f, sort_keys=True, indent=2) # readable JSON
api.upload_file(
path_or_fileobj=submission_id_to_model_title_path,
path_in_repo="submission_id_to_model_title.json",
repo_id=self.SERVER_ADDRESS,
repo_type=self.REPO_TYPE,
token=HF_TOKEN,
)
def _upload_submission(self, submission_id, file):
api.upload_file(
path_or_fileobj=file,
path_in_repo=f"data/{submission_id}.json",
repo_id=self.SERVER_ADDRESS,
repo_type=self.REPO_TYPE,
token=HF_TOKEN,
)
def _upload_tournament_results(self, tournament_results):
# Temporary save tournament results
with self.results_dataset_local_snapshot_lock.rw:
tournament_results_path = os.path.join(self.results_dataset_local_snapshot, "tournament.json")
with open(tournament_results_path, "w") as f:
json.dump(tournament_results, f, sort_keys=True, indent=2) # readable JSON
api.upload_file(
path_or_fileobj=tournament_results_path,
path_in_repo="tournament.json",
repo_id=self.SERVER_ADDRESS,
repo_type=self.REPO_TYPE,
token=HF_TOKEN,
)
def get_model_detail(self, submission_id):
with self.var_lock.ro:
if submission_id not in self.submission_id_to_data:
raise gr.Error(f"Submission [{submission_id}] not found")
else:
data = self.submission_id_to_data[submission_id]
return data["submission_metadata"]