Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import os | |
import time | |
from datetime import datetime | |
import logging | |
import numpy as np | |
import pandas as pd | |
import spacy | |
from sentence_transformers import CrossEncoder | |
from litellm import completion | |
from tqdm import tqdm | |
import src.backend.util as util | |
import src.envs as envs | |
# Set up basic configuration for logging | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s') | |
# Load spacy model for word tokenization | |
nlp = spacy.load("en_core_web_sm") | |
os.environ["HUGGINGFACE_API_KEY"] = envs.TOKEN | |
def load_evaluation_model(model_path): | |
"""Load the evaluation model from the given path | |
Args: | |
model_path (str): Path to the evaluation model | |
Returns: | |
CrossEncoder: The evaluation model | |
""" | |
model = CrossEncoder(model_path) | |
return model | |
def generate_summary(model: str, system_prompt: str, user_prompt: str, api_base: str): | |
response = completion( | |
model=model, | |
messages=[{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": user_prompt}], | |
temperature=0.0, | |
max_tokens=1024, | |
api_base=api_base, | |
) | |
return response['choices'][0]['message']['content'] | |
class ModelLoadingException(Exception): | |
"""Exception raised for errors in loading a model. | |
Attributes: | |
model_id (str): The model identifier. | |
revision (str): The model revision. | |
""" | |
def __init__(self, model_id, revision, messages="Error initializing model"): | |
self.model_id = model_id | |
self.revision = revision | |
super().__init__(f"{messages} id={model_id} revision={revision}") | |
class SummaryGenerator: | |
"""A class to generate summaries using a causal language model. | |
Attributes: | |
model (str): huggingface/{model_id} | |
api_base (str): https://api-inference.huggingface.co/models/{model_id} | |
summaries_df (DataFrame): DataFrame to store generated summaries. | |
revision (str): Model revision. | |
avg_length (float): Average length of summaries. | |
answer_rate (float): Rate of non-empty summaries. | |
""" | |
def __init__(self, model_id, revision): | |
""" | |
Initializes the SummaryGenerator with a model. | |
Args: | |
model_id (str): Identifier for the model. | |
revision (str): Revision of the model. | |
""" | |
self.model = f"huggingface/{model_id}" | |
self.api_base = f"https://api-inference.huggingface.co/models/{model_id}" | |
self.summaries_df = pd.DataFrame() | |
self.revision = revision | |
self.avg_length = None | |
self.answer_rate = None | |
self.exceptions = None | |
def generate_summaries(self, df): | |
"""Generate summaries for a given DataFrame of source docs. | |
Args: | |
df (DataFrame): DataFrame containing source docs. | |
Returns: | |
summaries_df (DataFrame): Generated summaries by the model. | |
""" | |
source, summary, dataset = [], [], [] | |
exceptions = [] | |
for index, row in tqdm(df.iterrows(), total=df.shape[0]): | |
_source = row['text'] | |
_dataset = row['dataset'] | |
system_prompt = envs.SYSTEM_PROMPT | |
user_prompt = f"{envs.USER_PROMPT}\nPassage:\n{_source}" | |
while True: | |
try: | |
_summary = generate_summary(self.model, system_prompt, | |
user_prompt, self.api_base) | |
break | |
except Exception as e: | |
if 'Rate limit reached' in str(e): | |
wait_time = 3660 | |
current_time = datetime.now().strftime('%H:%M:%S') | |
print(f"Rate limit hit at {current_time}. Waiting for 1 hour before retrying...") | |
time.sleep(wait_time) | |
else: | |
print(f"Error at index {index}: {e}") | |
_summary = "" | |
exceptions.append(index) | |
break | |
summary.append(_summary) | |
source.append(_source) | |
dataset.append(_dataset) | |
# Sleep to prevent hitting rate limits too frequently | |
time.sleep(1) | |
self.summaries_df = pd.DataFrame(list(zip(source, summary, dataset)), | |
columns=["source", "summary", "dataset"]) | |
self.exceptions = exceptions | |
self._compute_avg_length() | |
self._compute_answer_rate() | |
return self.summaries_df | |
def _compute_avg_length(self): | |
""" | |
Compute the average length of non-empty summaries using SpaCy. | |
""" | |
total_word_count = 0 | |
total_count = 0 | |
for summary in self.summaries_df['summary']: | |
if util.is_summary_valid(summary): | |
doc = nlp(summary) | |
words = [token.text for token in doc if token.is_alpha] | |
total_word_count += len(words) | |
total_count += 1 | |
self.avg_length = 0 if total_count == 0 else total_word_count / total_count | |
def _compute_answer_rate(self): | |
""" | |
Compute the rate of non-empty summaries. | |
""" | |
valid_count = sum(1 for summary in self.summaries_df['summary'] | |
if util.is_summary_valid(summary)) | |
total_count = len(self.summaries_df) | |
self.answer_rate = 0 if total_count == 0 else valid_count / total_count | |
class EvaluationModel: | |
"""A class to evaluate generated summaries. | |
Attributes: | |
model (CrossEncoder): The evaluation model. | |
scores (list): List of evaluation scores. | |
accuracy (float): Accuracy of the summaries. | |
hallucination_rate (float): Rate of hallucination in summaries. | |
""" | |
def __init__(self, model_path): | |
""" | |
Initializes the EvaluationModel with a CrossEncoder model. | |
Args: | |
model_path (str): Path to the CrossEncoder model. | |
""" | |
self.model = load_evaluation_model(model_path) | |
self.scores = [] | |
self.factual_consistency_rate = None | |
self.hallucination_rate = None | |
def evaluate_hallucination(self, summaries_df): | |
""" | |
Evaluate the hallucination rate in summaries. Updates the 'scores' attribute | |
of the instance with the computed scores. | |
Args: | |
summaries_df (DataFrame): DataFrame containing source docs and summaries. | |
Returns: | |
list: List of hallucination scores. Also updates the 'scores' attribute of the instance. | |
""" | |
hem_scores = [] | |
source_summary_pairs = util.create_pairs(summaries_df) | |
for doc, summary in tqdm(source_summary_pairs, desc="Evaluating hallucinations"): | |
if util.is_summary_valid(summary): | |
try: | |
score = self.model.predict([doc, summary])[0] | |
if not isinstance(score, float): | |
logging.warning(f"Score type mismatch: Expected float, got {type(score)}.") | |
continue | |
hem_scores.append(score) | |
except Exception as e: | |
logging.error(f"Error while running HEM: {e}") | |
raise | |
self.scores = hem_scores | |
return hem_scores | |
def compute_factual_consistency_rate(self, threshold=0.5): | |
""" | |
Compute the factual consistency rate of the evaluated summaries based on | |
the previously calculated scores. This method relies on the 'scores' | |
attribute being populated, typically via the 'evaluate_hallucination' method. | |
Returns: | |
float: Factual Consistency Rate. Also updates the 'factual_consistency_rate' | |
and 'hallucination_rate' attributes of the instance. | |
Raises: | |
ValueError: If scores have not been calculated prior to calling this method. | |
""" | |
if not self.scores: | |
error_msg = "Scores not calculated. Call evaluate_hallucination() first." | |
logging.error(error_msg) | |
raise ValueError(error_msg) | |
# Use threshold of 0.5 to compute factual_consistency_rate | |
num_above_threshold = sum(score >= threshold for score in self.scores) | |
num_total = len(self.scores) | |
if not num_total: | |
raise ValueError("No scores available to compute factual consistency rate.") | |
self.factual_consistency_rate = (num_above_threshold / num_total) * 100 | |
self.hallucination_rate = 100 - self.factual_consistency_rate | |
return self.factual_consistency_rate | |