import json import os import logging from datetime import datetime import src.envs as envs from src.backend.manage_requests import EvalRequest from src.backend.evaluate_model import Evaluator # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.getLogger("openai").setLevel(logging.WARNING) def run_evaluation(eval_request: EvalRequest, batch_size, device, local_dir: str, results_repo: str, no_cache=True, limit=None, need_check=True, write_results=False): """ Run the evaluation for a given model and upload the results. Args: eval_request (EvalRequest): The evaluation request object containing model details. num_fewshot (int): Number of few-shot examples. batch_size (int): Batch size for processing. device (str): The device to run the evaluation on. local_dir (str): Local directory path for saving results. results_repo (str): Repository ID where results will be uploaded. no_cache (bool): Whether to disable caching. limit (int, optional): Limit on the number of items to process. Use with caution. Returns: dict: A dictionary containing evaluation results. """ if limit: logging.warning("WARNING: --limit SHOULD ONLY BE USED FOR TESTING. REAL METRICS SHOULD NOT BE COMPUTED USING LIMIT.") output_folder = os.path.join(local_dir, *eval_request.model.split("/")) # if os.path.exists(output_folder): # f_name = os.listdir(output_folder)[-1] # print(f"Loading results from {os.path.join(output_folder, f_name)}") # results = json.loads(os.path.join(output_folder, f_name)) # dumped = json.dumps(results, indent=2) # logging.info(dumped) # else: try: evaluator = Evaluator(eval_request.model, eval_request.revision, eval_request.precision, batch_size, device, no_cache, limit, write_out=True, output_base_path='logs') results = evaluator.evaluate() if write_results: evaluator.write_results() # upload leaderboard_summaries.csv to HF envs.API.upload_file( path_or_fileobj=envs.LEADERBOARD_DATASET_PATH, path_in_repo=envs.LEADERBOARD_DATASET_PATH.split('/')[-1], repo_id=envs.LEADERBOARD_DATASET_REPO, repo_type="dataset", commit_message=f"Update results for {eval_request.model}" ) logging.info(f"Leaderboard result dataset has been updated to {envs.LEADERBOARD_DATASET_PATH}/{envs.LEADERBOARD_DATASET_PATH.split('/')[-1]}") except Exception as e: logging.error(f"Error during evaluation: {e}") raise dumped = json.dumps(results, indent=2) logging.info(dumped) output_path = os.path.join(output_folder, f"results_{datetime.now()}.json") # os.makedirs(output_folder, exist_ok=True) with open(output_path, "w") as f: f.write(dumped) logging.info(f"Results have been saved to{output_path}") if not need_check: logging.info(f"Path in the repo: {eval_request.model}/results_{datetime.now()}.json") envs.API.upload_file( path_or_fileobj=output_path, path_in_repo=f"{eval_request.model}/results_{datetime.now()}.json", repo_id=results_repo, repo_type="dataset", ) return results