Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import json | |
import logging | |
import pathlib | |
import pandas as pd | |
import gradio as gr | |
import schedule | |
import time | |
from datetime import datetime, timezone | |
from src.display.utils import EvalQueueColumn | |
from src.envs import API | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class VoteManager: | |
def __init__(self, votes_path, eval_requests_path, repo_id): | |
self.votes_path = votes_path | |
self.eval_requests_path = eval_requests_path | |
self.repo_id = repo_id | |
self.vote_dataset = self.read_vote_dataset() | |
self.vote_check_set = self.make_check_set(self.vote_dataset) | |
self.votes_to_upload = [] | |
def init_vote_dataset(self): | |
self.vote_dataset = self.read_vote_dataset() | |
self.vote_check_set = self.make_check_set(self.vote_dataset) | |
def read_vote_dataset(self): | |
result = [] | |
votes_file = pathlib.Path(self.votes_path) / "votes_data.jsonl" | |
if votes_file.exists(): | |
with open(votes_file, "r") as f: | |
for line in f: | |
data = json.loads(line.strip()) | |
result.append(data) | |
result = pd.DataFrame(result) | |
return result | |
def make_check_set(self, vote_dataset: pd.DataFrame): | |
result = list() | |
for row in vote_dataset.itertuples(index=False, name='vote'): | |
result.append((row.model, row.revision, row.username)) | |
return set(result) | |
def get_model_revision(self, selected_model: str) -> str: | |
"""Fetch the revision for the given model from the request files.""" | |
for user_folder in pathlib.Path(self.eval_requests_path).iterdir(): | |
if user_folder.is_dir(): | |
for file in user_folder.glob("*.json"): | |
with open(file, "r") as f: | |
data = json.load(f) | |
if data.get("model") == selected_model: | |
return data.get("revision", "main") | |
return "main" | |
def create_request_vote_df(self, pending_models_df: gr.Dataframe): | |
if pending_models_df.empty or "model_name" not in pending_models_df.columns: | |
return pending_models_df | |
self.vote_dataset = self.read_vote_dataset() | |
vote_counts = self.vote_dataset.groupby(['model', 'revision']).size().reset_index(name='vote_count') | |
pending_models_df_votes = pd.merge( | |
pending_models_df, | |
vote_counts, | |
left_on=["model_name", 'revision'], | |
right_on=['model', 'revision'], | |
how='left' | |
) | |
# Filling empty votes | |
pending_models_df_votes['vote_count'] = pending_models_df_votes['vote_count'].fillna(0) | |
pending_models_df_votes = pending_models_df_votes.sort_values(by=["vote_count", "model_name"], ascending=[False, True]) | |
# Removing useless columns | |
pending_models_df_votes = pending_models_df_votes.drop(["model_name", "model"], axis=1) | |
return pending_models_df_votes | |
# Function to be called when a user votes for a model | |
def add_vote( | |
self, | |
selected_model: str, | |
pending_models_df: gr.Dataframe | None, | |
profile: gr.OAuthProfile | None | |
): | |
logger.debug(f"Type of list before usage: {type(list)}") | |
# model_name, revision, user_id, timestamp | |
if selected_model in ["str", ""]: | |
gr.Warning("No model selected") | |
return | |
if profile is None: | |
gr.Warning("Hub Login required") | |
return | |
vote_username = profile.username | |
model_revision = self.get_model_revision(selected_model) | |
# tuple (immutable) for checking than already voted for model | |
check_tuple = (selected_model, model_revision, vote_username) | |
if check_tuple in self.vote_check_set: | |
gr.Warning("Already voted for this model") | |
return | |
current_time = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") | |
vote_obj = { | |
"model": selected_model, | |
"revision": model_revision, | |
"username": vote_username, | |
"timestamp": current_time | |
} | |
# Append the vote to the JSONL file | |
try: | |
votes_file = pathlib.Path(self.votes_path) / "votes_data.jsonl" | |
with open(votes_file, "a") as f: | |
f.write(json.dumps(vote_obj) + "\n") | |
logger.info(f"Vote added locally: {vote_obj}") | |
self.votes_to_upload.append(vote_obj) | |
except Exception as e: | |
logger.error(f"Failed to write vote to file: {e}") | |
gr.Warning("Failed to record vote. Please try again") | |
return | |
self.vote_check_set.add(check_tuple) | |
gr.Info(f"Voted for {selected_model}") | |
if pending_models_df is None: | |
return | |
return self.create_request_vote_df(pending_models_df) | |
def upload_votes(self): | |
if self.votes_to_upload: | |
votes_file = pathlib.Path(self.votes_path) / "votes_data.jsonl" | |
try: | |
with open(votes_file, "rb") as f: | |
API.upload_file( | |
path_or_fileobj=f, | |
path_in_repo="votes_data.jsonl", | |
repo_id=self.repo_id, | |
repo_type="dataset", | |
commit_message="Updating votes_data.jsonl with new votes", | |
) | |
logger.info("Votes uploaded to votes repository") | |
self.votes_to_upload.clear() | |
except Exception as e: | |
logger.error(f"Failed to upload votes to repository: {e}") | |
def run_scheduler(vote_manager): | |
while True: | |
schedule.run_pending() | |
time.sleep(1) | |