Spaces:
Runtime error
Runtime error
import gradio as gr | |
import json | |
import os | |
from datetime import datetime, timezone | |
from apscheduler.schedulers.background import BackgroundScheduler | |
import pandas as pd | |
from huggingface_hub import snapshot_download | |
from src.display.about import ( | |
CITATION_BUTTON_LABEL, | |
CITATION_BUTTON_TEXT, | |
EVALUATION_QUEUE_TEXT, | |
INTRODUCTION_TEXT, | |
LLM_BENCHMARKS_TEXT, | |
FAQ_TEXT, | |
TITLE, | |
) | |
from src.display.css_html_js import custom_css | |
from src.display.utils import ( | |
BENCHMARK_COLS, | |
COLS, | |
EVAL_COLS, | |
EVAL_TYPES, | |
NUMERIC_INTERVALS, | |
TYPES, | |
AutoEvalColumn, | |
ModelType, | |
fields, | |
WeightType, | |
Precision | |
) | |
from src.envs import API, EVAL_REQUESTS_PATH, DYNAMIC_INFO_REPO, DYNAMIC_INFO_FILE_PATH, DYNAMIC_INFO_PATH, EVAL_RESULTS_PATH, H4_TOKEN, IS_PUBLIC, QUEUE_REPO, REPO_ID, RESULTS_REPO | |
from src.populate import get_evaluation_queue_df, get_leaderboard_df | |
from src.submission.submit import add_new_eval | |
from src.tools.collections import update_collections | |
from src.tools.plots import ( | |
create_metric_plot_obj, | |
create_plot_df, | |
create_scores_df, | |
) | |
def restart_space(): | |
API.restart_space(repo_id=REPO_ID, token=H4_TOKEN) | |
def init_space(): | |
try: | |
print(EVAL_REQUESTS_PATH) | |
snapshot_download( | |
repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30 | |
) | |
except Exception: | |
restart_space() | |
try: | |
print(DYNAMIC_INFO_PATH) | |
snapshot_download( | |
repo_id=DYNAMIC_INFO_REPO, local_dir=DYNAMIC_INFO_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30 | |
) | |
except Exception: | |
restart_space() | |
try: | |
print(EVAL_RESULTS_PATH) | |
snapshot_download( | |
repo_id=RESULTS_REPO, local_dir=EVAL_RESULTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30 | |
) | |
except Exception: | |
restart_space() | |
raw_data, original_df = get_leaderboard_df( | |
results_path=EVAL_RESULTS_PATH, | |
requests_path=EVAL_REQUESTS_PATH, | |
dynamic_path=DYNAMIC_INFO_FILE_PATH, | |
cols=COLS, | |
benchmark_cols=BENCHMARK_COLS | |
) | |
update_collections(original_df.copy()) | |
leaderboard_df = original_df.copy() | |
plot_df = create_plot_df(create_scores_df(raw_data)) | |
( | |
finished_eval_queue_df, | |
running_eval_queue_df, | |
pending_eval_queue_df, | |
) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, EVAL_COLS) | |
return leaderboard_df, original_df, plot_df, finished_eval_queue_df, running_eval_queue_df, pending_eval_queue_df | |
leaderboard_df, original_df, plot_df, finished_eval_queue_df, running_eval_queue_df, pending_eval_queue_df = init_space() | |
# Searching and filtering | |
def update_table( | |
hidden_df: pd.DataFrame, | |
columns: list, | |
type_query: list, | |
precision_query: str, | |
size_query: list, | |
show_deleted: bool, | |
show_merges: bool, | |
show_moe: bool, | |
show_flagged: bool, | |
query: str, | |
): | |
filtered_df = filter_models(hidden_df, type_query, size_query, precision_query, show_deleted, show_merges, show_moe, show_flagged) | |
filtered_df = filter_queries(query, filtered_df) | |
df = select_columns(filtered_df, columns) | |
return df | |
def load_query(request: gr.Request): # triggered only once at startup => read query parameter if it exists | |
query = request.query_params.get("query") or "" | |
return query, query # return one for the "search_bar", one for a hidden component that triggers a reload only if value has changed | |
def search_table(df: pd.DataFrame, query: str) -> pd.DataFrame: | |
return df[(df[AutoEvalColumn.dummy.name].str.contains(query, case=False))] | |
def select_columns(df: pd.DataFrame, columns: list) -> pd.DataFrame: | |
always_here_cols = [c.name for c in fields(AutoEvalColumn) if c.never_hidden] | |
dummy_col = [AutoEvalColumn.dummy.name] | |
#AutoEvalColumn.model_type_symbol.name, | |
#AutoEvalColumn.model.name, | |
# We use COLS to maintain sorting | |
filtered_df = df[ | |
always_here_cols + [c for c in COLS if c in df.columns and c in columns] + dummy_col | |
] | |
return filtered_df | |
def filter_queries(query: str, filtered_df: pd.DataFrame): | |
"""Added by Abishek""" | |
final_df = [] | |
if query != "": | |
queries = [q.strip() for q in query.split(";")] | |
for _q in queries: | |
_q = _q.strip() | |
if _q != "": | |
temp_filtered_df = search_table(filtered_df, _q) | |
if len(temp_filtered_df) > 0: | |
final_df.append(temp_filtered_df) | |
if len(final_df) > 0: | |
filtered_df = pd.concat(final_df) | |
filtered_df = filtered_df.drop_duplicates( | |
subset=[AutoEvalColumn.model.name, AutoEvalColumn.precision.name, AutoEvalColumn.revision.name] | |
) | |
return filtered_df | |
def filter_models( | |
df: pd.DataFrame, type_query: list, size_query: list, precision_query: list, show_deleted: bool, show_merges: bool, show_moe:bool, show_flagged: bool | |
) -> pd.DataFrame: | |
# Show all models | |
if show_deleted: | |
filtered_df = df | |
else: # Show only still on the hub models | |
filtered_df = df[df[AutoEvalColumn.still_on_hub.name] == True] | |
if not show_merges: | |
filtered_df = filtered_df[filtered_df[AutoEvalColumn.merged.name] == False] | |
if not show_moe: | |
filtered_df = filtered_df[filtered_df[AutoEvalColumn.moe.name] == False] | |
if not show_flagged: | |
filtered_df = filtered_df[filtered_df[AutoEvalColumn.flagged.name] == False] | |
type_emoji = [t[0] for t in type_query] | |
filtered_df = filtered_df.loc[df[AutoEvalColumn.model_type_symbol.name].isin(type_emoji)] | |
filtered_df = filtered_df.loc[df[AutoEvalColumn.precision.name].isin(precision_query + ["None"])] | |
numeric_interval = pd.IntervalIndex(sorted([NUMERIC_INTERVALS[s] for s in size_query])) | |
params_column = pd.to_numeric(df[AutoEvalColumn.params.name], errors="coerce") | |
mask = params_column.apply(lambda x: any(numeric_interval.contains(x))) | |
filtered_df = filtered_df.loc[mask] | |
return filtered_df | |
leaderboard_df = filter_models( | |
df=leaderboard_df, | |
type_query=[t.to_str(" : ") for t in ModelType], | |
size_query=list(NUMERIC_INTERVALS.keys()), | |
precision_query=[i.value.name for i in Precision], | |
show_deleted=False, | |
show_merges=False, | |
show_moe=True, | |
show_flagged=False | |
) | |
import unicodedata | |
def is_valid_unicode(char): | |
try: | |
unicodedata.name(char) | |
return True # Valid Unicode character | |
except ValueError: | |
return False # Invalid Unicode character | |
def remove_invalid_unicode(input_string): | |
if isinstance(input_string, str): | |
valid_chars = [char for char in input_string if is_valid_unicode(char)] | |
return ''.join(valid_chars) | |
else: | |
return input_string # Return non-string values as is | |
dummy1 = gr.Textbox(visible=False) | |
hidden_leaderboard_table_for_search = gr.components.Dataframe( | |
headers=COLS, | |
datatype=TYPES, | |
visible=False, | |
line_breaks=False, | |
interactive=False | |
) | |
def display(x, y): | |
# Assuming df is your DataFrame | |
for column in leaderboard_df.columns: | |
if leaderboard_df[column].dtype == 'object': | |
leaderboard_df[column] = leaderboard_df[column].apply(remove_invalid_unicode) | |
subset_df = leaderboard_df[COLS] | |
return subset_df | |
INTRODUCTION_TEXT = """ | |
This is a copied space from Open LLM Leaderboard. Instead of displaying | |
the results as table this space was modified to simply provides a gradio API interface. | |
Using the following python script below, users can access the full leaderboard data easily. | |
```python | |
# Import dependencies | |
from gradio_client import Client | |
# Initialize the Gradio client with the API URL | |
client = Client("https://rodrigomasini-data-only-enterprise-scenarios-leaderboard.hf.space/") | |
try: | |
# Perform the API call | |
response = client.predict("","", api_name='/predict') | |
# Check if response it's directly accessible | |
if len(response) > 0: | |
print("Response received!") | |
headers = response.get('headers', []) | |
data = response.get('data', []) | |
print(headers) | |
# Remove commenst if you want to download the dataset and save in csv format | |
# Specify the path to your CSV file | |
#csv_file_path = 'foundational-models-benchmark.csv' | |
# Open the CSV file for writing | |
#with open(csv_file_path, mode='w', newline='', encoding='utf-8') as file: | |
# writer = csv.writer(file) | |
# Write the headers | |
# writer.writerow(headers) | |
# Write the data | |
# for row in data: | |
# writer.writerow(row) | |
#print(f"Results saved to {csv_file_path}") | |
# If the above line prints a string that looks like JSON, you can parse it with json.loads(response) | |
# Otherwise, you might need to adjust based on the actual structure of `response` | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
``` | |
""" | |
interface = gr.Interface( | |
fn=display, | |
inputs=[gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text"), dummy1], | |
outputs=[hidden_leaderboard_table_for_search] | |
) | |
scheduler = BackgroundScheduler() | |
scheduler.add_job(restart_space, "interval", seconds=1800) | |
scheduler.start() | |
interface.launch() |