import os import time from datetime import datetime import logging import numpy as np import pandas as pd import spacy from sentence_transformers import CrossEncoder from litellm import completion from tqdm import tqdm import src.backend.util as util import src.envs as envs # Set up basic configuration for logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Load spacy model for word tokenization nlp = spacy.load("en_core_web_sm") os.environ["HUGGINGFACE_API_KEY"] = envs.TOKEN def load_evaluation_model(model_path): """Load the evaluation model from the given path Args: model_path (str): Path to the evaluation model Returns: CrossEncoder: The evaluation model """ model = CrossEncoder(model_path) return model def generate_summary(model: str, system_prompt: str, user_prompt: str, api_base: str): response = completion( model=model, messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}], temperature=0.0, max_tokens=1024, api_base=api_base, ) return response['choices'][0]['message']['content'] class ModelLoadingException(Exception): """Exception raised for errors in loading a model. Attributes: model_id (str): The model identifier. revision (str): The model revision. """ def __init__(self, model_id, revision, messages="Error initializing model"): self.model_id = model_id self.revision = revision super().__init__(f"{messages} id={model_id} revision={revision}") class SummaryGenerator: """A class to generate summaries using a causal language model. Attributes: model (str): huggingface/{model_id} api_base (str): https://api-inference.huggingface.co/models/{model_id} summaries_df (DataFrame): DataFrame to store generated summaries. revision (str): Model revision. avg_length (float): Average length of summaries. answer_rate (float): Rate of non-empty summaries. """ def __init__(self, model_id, revision): """ Initializes the SummaryGenerator with a model. Args: model_id (str): Identifier for the model. revision (str): Revision of the model. """ self.model = f"huggingface/{model_id}" self.api_base = f"https://api-inference.huggingface.co/models/{model_id}" self.summaries_df = pd.DataFrame() self.revision = revision self.avg_length = None self.answer_rate = None self.exceptions = None def generate_summaries(self, df): """Generate summaries for a given DataFrame of source docs. Args: df (DataFrame): DataFrame containing source docs. Returns: summaries_df (DataFrame): Generated summaries by the model. """ source, summary, dataset = [], [], [] exceptions = [] for index, row in tqdm(df.iterrows(), total=df.shape[0]): _source = row['text'] _dataset = row['dataset'] system_prompt = envs.SYSTEM_PROMPT user_prompt = f"{envs.USER_PROMPT}\nPassage:\n{_source}" while True: try: _summary = generate_summary(self.model, system_prompt, user_prompt, self.api_base) break except Exception as e: if 'Rate limit reached' in str(e): wait_time = 3660 current_time = datetime.now().strftime('%H:%M:%S') print(f"Rate limit hit at {current_time}. Waiting for 1 hour before retrying...") time.sleep(wait_time) else: print(f"Error at index {index}: {e}") _summary = "" exceptions.append(index) break summary.append(_summary) source.append(_source) dataset.append(_dataset) # Sleep to prevent hitting rate limits too frequently time.sleep(1) self.summaries_df = pd.DataFrame(list(zip(source, summary, dataset)), columns=["source", "summary", "dataset"]) self.exceptions = exceptions self._compute_avg_length() self._compute_answer_rate() return self.summaries_df def _compute_avg_length(self): """ Compute the average length of non-empty summaries using SpaCy. """ total_word_count = 0 total_count = 0 for summary in self.summaries_df['summary']: if util.is_summary_valid(summary): doc = nlp(summary) words = [token.text for token in doc if token.is_alpha] total_word_count += len(words) total_count += 1 self.avg_length = 0 if total_count == 0 else total_word_count / total_count def _compute_answer_rate(self): """ Compute the rate of non-empty summaries. """ valid_count = sum(1 for summary in self.summaries_df['summary'] if util.is_summary_valid(summary)) total_count = len(self.summaries_df) self.answer_rate = 0 if total_count == 0 else valid_count / total_count class EvaluationModel: """A class to evaluate generated summaries. Attributes: model (CrossEncoder): The evaluation model. scores (list): List of evaluation scores. accuracy (float): Accuracy of the summaries. hallucination_rate (float): Rate of hallucination in summaries. """ def __init__(self, model_path): """ Initializes the EvaluationModel with a CrossEncoder model. Args: model_path (str): Path to the CrossEncoder model. """ self.model = load_evaluation_model(model_path) self.scores = [] self.factual_consistency_rate = None self.hallucination_rate = None def evaluate_hallucination(self, summaries_df): """ Evaluate the hallucination rate in summaries. Updates the 'scores' attribute of the instance with the computed scores. Args: summaries_df (DataFrame): DataFrame containing source docs and summaries. Returns: list: List of hallucination scores. Also updates the 'scores' attribute of the instance. """ hem_scores = [] source_summary_pairs = util.create_pairs(summaries_df) for doc, summary in tqdm(source_summary_pairs, desc="Evaluating hallucinations"): if util.is_summary_valid(summary): try: score = self.model.predict([doc, summary])[0] if not isinstance(score, float): logging.warning(f"Score type mismatch: Expected float, got {type(score)}.") continue hem_scores.append(score) except Exception as e: logging.error(f"Error while running HEM: {e}") raise self.scores = hem_scores return hem_scores def compute_factual_consistency_rate(self, threshold=0.5): """ Compute the factual consistency rate of the evaluated summaries based on the previously calculated scores. This method relies on the 'scores' attribute being populated, typically via the 'evaluate_hallucination' method. Returns: float: Factual Consistency Rate. Also updates the 'factual_consistency_rate' and 'hallucination_rate' attributes of the instance. Raises: ValueError: If scores have not been calculated prior to calling this method. """ if not self.scores: error_msg = "Scores not calculated. Call evaluate_hallucination() first." logging.error(error_msg) raise ValueError(error_msg) # Use threshold of 0.5 to compute factual_consistency_rate num_above_threshold = sum(score >= threshold for score in self.scores) num_total = len(self.scores) if not num_total: raise ValueError("No scores available to compute factual consistency rate.") self.factual_consistency_rate = (num_above_threshold / num_total) * 100 self.hallucination_rate = 100 - self.factual_consistency_rate return self.factual_consistency_rate