import json import os from typing import List, List, Tuple, Any, Dict from huggingface_hub import snapshot_download from collections import defaultdict import requests from src.envs import EVAL_REQUESTS_PATH, QUEUE_REPO def webhook_bot(msg: str): bot_url = os.environ.get("DISCORD_WEBHOOK_URL", "") if bot_url != "": requests.post(bot_url, json={"content": msg}) def read_all_pending_model(EVAL_REQUESTS_PATH: str) -> Dict[str, List[Tuple[Any, str]]]: depth = 1 alls = defaultdict(list) for root, _, files in os.walk(EVAL_REQUESTS_PATH): current_depth = root.count(os.sep) - EVAL_REQUESTS_PATH.count(os.sep) if current_depth == depth: for file in files: if not file.endswith(".json"): continue file_abs_path = os.path.join(root, file) with open(file_abs_path, "r") as f: info = json.load(f) alls[info['model']].append((info, file_abs_path)) pendings = {} for k in alls.keys(): is_pending = False for stat in alls[k]: info_dict = stat[0] if info_dict['status'] == "PENDING": is_pending = True if is_pending: pendings[k] = alls[k] return pendings def watch_submit_queue(): try: snapshot_download( repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, ) alls = read_all_pending_model(EVAL_REQUESTS_PATH) pending_model = [] for model_name in alls.keys(): for request_row in alls[model_name]: info, filepath = request_row status = info["status"] model_name = info["model"] if status == "PENDING": pending_model.append(model_name) pending_model = list(set(pending_model)) pending_model_str = '\n'.join(pending_model) webhook_bot(f'Leaderboard model pending: {len(pending_model)}\n### Models\n{pending_model_str}') except Exception as e: print(f'Watch submit queue error: {e}') pass