xinchen9's picture
refine (#12)
e8db64e verified
import subprocess
import gradio as gr
import pandas as pd
from apscheduler.schedulers.background import BackgroundScheduler
from huggingface_hub import snapshot_download
from src.about import (
CITATION_BUTTON_LABEL,
CITATION_BUTTON_TEXT,
EVALUATION_QUEUE_TEXT,
INTRODUCTION_TEXT,
LLM_BENCHMARKS_TEXT,
TITLE,
)
from src.display.css_html_js import custom_css
from src.display.utils import (
BENCHMARK_COLS,
COLS,
EVAL_COLS,
EVAL_TYPES,
NUMERIC_INTERVALS,
TYPES,
AutoEvalColumn,
ModelType,
fields,
WeightType,
Precision
)
from src.envs import API, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, QUEUE_REPO, REPO_ID, RESULTS_REPO, TOKEN
# from src.populate import get_evaluation_queue_df, get_leaderboard_df
# from src.submission.submit import add_new_eval
# from PIL import Image
# from dummydatagen import dummy_data_for_plot, create_metric_plot_obj_1, dummydf
# import copy
def load_data(data_path):
columns = ['Unlearned_Methods','Pre-ASR','Post-ASR','FID','CLIP-Score']
columns_sorted = ['Unlearned_Methods','Pre-ASR','Post-ASR','FID','CLIP-Score']
df = pd.read_csv(data_path).dropna()
# df['Post-ASR'] = df['Post-ASR'].round(0)
# rank according to the Score column
df = df.sort_values(by='Post-ASR', ascending=False)
# reorder the columns
df = df[columns_sorted]
return df
def restart_space():
API.restart_space(repo_id=REPO_ID)
# try:
# print(EVAL_REQUESTS_PATH)
# snapshot_download(
# repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN
# )
# except Exception:
# restart_space()
# try:
# print(EVAL_RESULTS_PATH)
# snapshot_download(
# repo_id=RESULTS_REPO, local_dir=EVAL_RESULTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN
# )
# except Exception:
# restart_space()
# raw_data, original_df = get_leaderboard_df(EVAL_RESULTS_PATH, EVAL_REQUESTS_PATH, COLS, BENCHMARK_COLS)
# leaderboard_df = original_df.copy()
# (
# finished_eval_queue_df,
# running_eval_queue_df,
# pending_eval_queue_df,
# ) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, EVAL_COLS)
all_columns = ['Unlearned_Methods','Pre-ASR','Post-ASR','FID','CLIP-Score']
show_columns = ['Unlearned_Methods','Pre-ASR','Post-ASR','FID','CLIP-Score']
TYPES = ['str','number','number','number','number']
files = ['nudity','vangogh', 'church','garbage','parachute','tench']
csv_path='./assets/'+files[0]+'.csv'
df_results = load_data(csv_path)
methods = list(set(df_results['Unlearned_Methods']))
df_results_init = df_results.copy()[show_columns]
def update_table(
hidden_df: pd.DataFrame,
model1_column: list,
#type_query: list,
#open_query: list,
# precision_query: str,
# size_query: list,
# show_deleted: bool,
query: str,
):
# filtered_df = filter_models(hidden_df, type_query, size_query, precision_query, show_deleted)
# filtered_df = filter_queries(query, filtered_df)
# df = select_columns(filtered_df, columns)
filtered_df = hidden_df.copy()
# print(open_query)
# filtered_df = filtered_df[filtered_df['Unlearned_Methods'].isin(open_query)]
# map_open = {'open': 'Yes', 'closed': 'No'}
# filtered_df = filtered_df[filtered_df['Open?'].isin([map_open[o] for o in open_query])]
filtered_df=select_columns(filtered_df,model1_column)
filtered_df = filter_queries(query, filtered_df)
# map_open = {'SD V1.4', 'SD V1.5', 'SD V2.0'}
# filtered_df = filtered_df[filtered_df["Diffusion_Models"].isin([o for o in open_query])]
# filtered_df = filtered_df[[map_columns[k] for k in columns]]
# deduplication
# df = df.drop_duplicates(subset=["Model"])
df = filtered_df.drop_duplicates()
# df = df[show_columns]
return df
def search_table(df: pd.DataFrame, query: str) -> pd.DataFrame:
return df[(df['Unlearned_Methods'].str.contains(query, case=False))]
def filter_queries(query: str, filtered_df: pd.DataFrame) -> pd.DataFrame:
final_df = []
if query != "":
queries = [q.strip() for q in query.split(";")]
for _q in queries:
_q = _q.strip()
if _q != "":
temp_filtered_df = search_table(filtered_df, _q)
if len(temp_filtered_df) > 0:
final_df.append(temp_filtered_df)
if len(final_df) > 0:
filtered_df = pd.concat(final_df)
return filtered_df
def search_table_model(df: pd.DataFrame, query: str) -> pd.DataFrame:
return df[(df['Diffusion_Models'].str.contains(query, case=False))]
def filter_queries_model(query: str, filtered_df: pd.DataFrame) -> pd.DataFrame:
final_df = []
# if query != "":
# queries = [q.strip() for q in query.split(";")]
for _q in query:
print(_q)
if _q != "":
temp_filtered_df = search_table_model(filtered_df, _q)
if len(temp_filtered_df) > 0:
final_df.append(temp_filtered_df)
if len(final_df) > 0:
filtered_df = pd.concat(final_df)
return filtered_df
def select_columns(df: pd.DataFrame, columns_1: list) -> pd.DataFrame:
always_here_cols = ['Unlearned_Methods']
# We use COLS to maintain sorting
all_columns =['Pre-ASR','Post-ASR','FID','CLIP-Score']
if (len(columns_1)) == 0:
filtered_df = df[
always_here_cols +
[c for c in all_columns if c in df.columns]
]
else:
filtered_df = df[
always_here_cols +
[c for c in all_columns if c in df.columns and (c in columns_1) ]
]
return filtered_df
demo = gr.Blocks(css=custom_css)
with demo:
with gr.Row():
gr.Image("./assets/logo.png", height="175px", width="675px", scale=0.2,
show_download_button=False, container=False)
gr.HTML(TITLE, elem_id="title")
gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text")
gr.Markdown(EVALUATION_QUEUE_TEXT,elem_classes="eval-text")
gr.Markdown(LLM_BENCHMARKS_TEXT, elem_classes="reference-text")
with gr.Tabs(elem_classes="tab-buttons") as tabs:
with gr.TabItem("πŸ”ž NSFW", elem_id="UnlearnDiffAtk-benchmark-tab-table", id=0):
files = ['nudity']
with gr.Row():
with gr.Column():
with gr.Row():
search_bar = gr.Textbox(
placeholder=" πŸ” Search for your model (separate multiple queries with `;`) and press ENTER...",
show_label=False,
elem_id="search-bar",
)
with gr.Row():
model1_column = gr.CheckboxGroup(
label="Evaluation Metrics",
choices=['Pre-ASR','Post-ASR','FID','CLIP-Score'],
interactive=True,
elem_id="column-select",
)
for i in range(len(files)):
if files[i] == 'nudity':
name = "### [Unlearned Concept]: "+" Nudity"
csv_path = './assets/'+files[i]+'.csv'
# elif files[i] == 'violence':
# name = "### Unlearned Concepts "+" Violence"
# csv_path = './assets/'+files[i]+'.csv'
# elif files[i] == 'illegal_activity':
# name = "### Unlearned Concepts "+" Illgal Activity"
# csv_path = './assets/'+files[i]+'.csv'
gr.Markdown(name)
df_results = load_data(csv_path)
df_results_init = df_results.copy()[show_columns]
leaderboard_table = gr.components.Dataframe(
value = df_results,
datatype = TYPES,
elem_id = "leaderboard-table",
interactive = False,
visible=True,
)
hidden_leaderboard_table_for_search = gr.components.Dataframe(
value=df_results_init,
# value=df_results,
interactive=False,
visible=False,
)
search_bar.submit(
update_table,
[
hidden_leaderboard_table_for_search,
model1_column,
search_bar,
],
leaderboard_table,
)
for selector in [model1_column]:
selector.change(
update_table,
[
hidden_leaderboard_table_for_search,
model1_column,
search_bar,
],
leaderboard_table,
)
with gr.TabItem("🎨 Style", elem_id="Style", id=1):
files = ['vangogh']
with gr.Row():
with gr.Column():
with gr.Row():
search_bar = gr.Textbox(
placeholder=" πŸ” Search for your model (separate multiple queries with `;`) and press ENTER...",
show_label=False,
elem_id="search-bar",
)
with gr.Row():
model1_column = gr.CheckboxGroup(
label="Evaluation Metrics",
choices=['Pre-ASR','Post-ASR','FID','CLIP-Score'],
interactive=True,
elem_id="column-select",
)
for i in range(len(files)):
if files[i] == 'vangogh':
name = "### [Unlearned Style]: "+" Van Gogh"
csv_path = './assets/'+files[i]+'.csv'
gr.Markdown(name)
df_results = load_data(csv_path)
df_results_init = df_results.copy()[show_columns]
leaderboard_table = gr.components.Dataframe(
value = df_results,
datatype = TYPES,
elem_id = "leaderboard-table",
interactive = False,
visible=True,
)
hidden_leaderboard_table_for_search = gr.components.Dataframe(
value=df_results_init,
# value=df_results,
interactive=False,
visible=False,
)
search_bar.submit(
update_table,
[
hidden_leaderboard_table_for_search,
model1_column,
search_bar,
],
leaderboard_table,
)
for selector in [model1_column]:
selector.change(
update_table,
[
hidden_leaderboard_table_for_search,
model1_column,
search_bar,
],
leaderboard_table,
)
with gr.TabItem("πŸͺ‚ Object", elem_id="UnlearnDiffAtk-benchmark-tab-table", id=2):
files = ['church','garbage','parachute','tench']
with gr.Row():
with gr.Column():
with gr.Row():
search_bar = gr.Textbox(
placeholder=" πŸ” Search for your model (separate multiple queries with `;`) and press ENTER...",
show_label=False,
elem_id="search-bar",
)
with gr.Row():
model1_column = gr.CheckboxGroup(
label="Evaluation Metrics",
choices=['Pre-ASR','Post-ASR','FID','CLIP-Score'],
interactive=True,
elem_id="column-select",
)
for i in range(len(files)):
if files[i] == "church":
name = "### [Unlearned Object]: "+" Church"
csv_path = './assets/'+files[i]+'.csv'
elif files[i] == 'garbage':
name = "### [Unlearned Object]: "+" Garbage"
csv_path = './assets/'+files[i]+'.csv'
elif files[i] == 'tench':
name = "### [Unlearned Object]: "+" Tench"
csv_path = './assets/'+files[i]+'.csv'
elif files[i] == 'parachute':
name = "### [Unlearned Object]: "+" Parachute"
csv_path = './assets/'+files[i]+'.csv'
gr.Markdown(name)
df_results = load_data(csv_path)
df_results_init = df_results.copy()[show_columns]
leaderboard_table = gr.components.Dataframe(
value = df_results,
datatype = TYPES,
elem_id = "leaderboard-table",
interactive = False,
visible=True,
)
hidden_leaderboard_table_for_search = gr.components.Dataframe(
value=df_results_init,
# value=df_results,
interactive=False,
visible=False,
)
search_bar.submit(
update_table,
[
hidden_leaderboard_table_for_search,
model1_column,
search_bar,
],
leaderboard_table,
)
for selector in [model1_column]:
selector.change(
update_table,
[
hidden_leaderboard_table_for_search,
model1_column,
search_bar,
],
leaderboard_table,
)
with gr.Row():
with gr.Accordion("πŸ“™ Citation", open=True):
citation_button = gr.Textbox(
value=CITATION_BUTTON_TEXT,
label=CITATION_BUTTON_LABEL,
lines=10,
elem_id="citation-button",
show_copy_button=True,
)
scheduler = BackgroundScheduler()
scheduler.add_job(restart_space, "interval", seconds=1800)
scheduler.start()
demo.queue().launch(share=True)