Spaces:
Runtime error
Runtime error
from pydantic import NoneStr | |
import os | |
from langchain.chains.question_answering import load_qa_chain | |
from langchain.document_loaders import UnstructuredFileLoader | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.llms import OpenAI | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain.vectorstores import FAISS | |
from langchain.vectorstores import Chroma | |
from langchain.chains import ConversationalRetrievalChain | |
import gradio as gr | |
import openai | |
from langchain import PromptTemplate, OpenAI, LLMChain | |
import validators | |
import requests | |
import mimetypes | |
import tempfile | |
import nltk | |
nltk.download('punkt') | |
class Chatbot: | |
def __init__(self): | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
def get_empty_state(self): | |
""" Create empty Knowledge base""" | |
return {"knowledge_base": None} | |
def create_knowledge_base(self,docs): | |
"""Create a knowledge base from the given documents. | |
Args: | |
docs (List[str]): List of documents. | |
Returns: | |
FAISS: Knowledge base built from the documents. | |
""" | |
# Initialize a CharacterTextSplitter to split the documents into chunks | |
# Each chunk has a maximum length of 500 characters | |
# There is no overlap between the chunks | |
text_splitter = CharacterTextSplitter( | |
separator="\n", chunk_size=1000, chunk_overlap=200, length_function=len | |
) | |
# Split the documents into chunks using the text_splitter | |
chunks = text_splitter.split_documents(docs) | |
# Initialize an OpenAIEmbeddings model to compute embeddings of the chunks | |
embeddings = OpenAIEmbeddings() | |
# Build a knowledge base using Chroma from the chunks and their embeddings | |
knowledge_base = Chroma.from_documents(chunks, embeddings) | |
# Return the resulting knowledge base | |
return knowledge_base | |
def upload_file(self,file_paths): | |
"""Upload a file and create a knowledge base from its contents. | |
Args: | |
file_paths : The files to uploaded. | |
Returns: | |
tuple: A tuple containing the file name and the knowledge base. | |
""" | |
file_paths = [i.name for i in file_paths] | |
print(file_paths) | |
loaders = [UnstructuredFileLoader(file_obj, strategy="fast") for file_obj in file_paths] | |
# Load the contents of the file using the loader | |
docs = [] | |
for loader in loaders: | |
docs.extend(loader.load()) | |
# Create a knowledge base from the loaded documents using the create_knowledge_base() method | |
knowledge_base = self.create_knowledge_base(docs) | |
# Return a tuple containing the file name and the knowledge base | |
return file_paths, {"knowledge_base": knowledge_base} | |
def add_text(self,history, text): | |
history = history + [(text, None)] | |
print("History for Add text : ",history) | |
return history, gr.update(value="", interactive=False) | |
def upload_multiple_urls(self,urls): | |
urlss = [url.strip() for url in urls.split(',')] | |
all_docs = [] | |
file_paths = [] | |
for url in urlss: | |
if validators.url(url): | |
headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',} | |
r = requests.get(url,headers=headers) | |
if r.status_code != 200: | |
raise ValueError("Check the url of your file; returned status code %s" % r.status_code) | |
content_type = r.headers.get("content-type") | |
file_extension = mimetypes.guess_extension(content_type) | |
temp_file = tempfile.NamedTemporaryFile(suffix=file_extension, delete=False) | |
temp_file.write(r.content) | |
file_path = temp_file.name | |
file_paths.append(file_path) | |
loaders = [UnstructuredFileLoader(file_obj, strategy="fast") for file_obj in file_paths] | |
# Load the contents of the file using the loader | |
docs = [] | |
for loader in loaders: | |
docs.extend(loader.load()) | |
# Create a knowledge base from the loaded documents using the create_knowledge_base() method | |
knowledge_base = self.create_knowledge_base(docs) | |
return file_paths,{"knowledge_base":knowledge_base} | |
def answer_question(self, question,history,state): | |
"""Answer a question based on the current knowledge base. | |
Args: | |
state (dict): The current state containing the knowledge base. | |
Returns: | |
str: The answer to the question. | |
""" | |
# Retrieve the knowledge base from the state dictionary | |
knowledge_base = state["knowledge_base"] | |
retriever = knowledge_base.as_retriever() | |
qa = ConversationalRetrievalChain.from_llm( | |
llm=OpenAI(temperature=0.1), | |
retriever=retriever, | |
return_source_documents=False) | |
# Set the question for which we want to find the answer | |
res = [] | |
question = history[-1][0] | |
for human, ai in history[:-1]: | |
pair = (human, ai) | |
res.append(pair) | |
chat_history = [] | |
query = question | |
result = qa({"question": query, "chat_history": chat_history}) | |
# Perform a similarity search on the knowledge base to retrieve relevant documents | |
response = result["answer"] | |
# Return the response as the answer to the question | |
history[-1][1] = response | |
print("History for QA : ",history) | |
return history | |
def clear_function(self,state): | |
state.clear() | |
# state = gr.State(self.get_empty_state()) | |
def gradio_interface(self): | |
"""Create the Gradio interface for the Chemical Identifier.""" | |
with gr.Blocks(css="style.css",theme='karthikeyan-adople/hudsonhayes-gray') as demo: | |
gr.HTML("""<div style='background-color:rgb(0,1,36); text-align:center;padding:40px; padding-top:0'> | |
<img class="leftimage" align="left" src="file=logo.png" alt="Image" width="210" height="210"> | |
<img class="rightimage" align="right" src="file=carnival.png" alt="Image" width="210" height="210"> | |
<h2 style="text-align:center; color:white; text-weight:bold;padding-top:90px; padding-botton:0">Procurement Digital Assistant</h2> | |
</div>""") | |
state = gr.State(self.get_empty_state()) | |
with gr.Column(elem_id="col-container"): | |
with gr.Accordion("Upload Files", open = False): | |
with gr.Row(elem_id="row-flex"): | |
with gr.Row(elem_id="row-flex"): | |
with gr.Column(scale=1,): | |
file_url = gr.Textbox(label='file url :',show_label=True, placeholder="") | |
with gr.Row(elem_id="row-flex"): | |
with gr.Column(scale=1): | |
file_output = gr.File() | |
with gr.Column(scale=1): | |
upload_button = gr.UploadButton("Browse File", file_types=[".txt", ".pdf", ".doc", ".docx"],file_count = "multiple") | |
with gr.Row(): | |
chatbot = gr.Chatbot([], elem_id="chatbot") | |
with gr.Row(): | |
txt = gr.Textbox(label = "Question",show_label=True,placeholder="Enter text and press Enter") | |
with gr.Row(): | |
clear_btn = gr.Button(value="Clear") | |
txt_msg = txt.submit(self.add_text, [chatbot, txt], [chatbot, txt], queue=False).then(self.answer_question, [txt, chatbot, state], chatbot) | |
txt_msg.then(lambda: gr.update(interactive=True), None, [txt], queue=False) | |
file_url.submit(self.upload_multiple_urls, file_url, [file_output, state]) | |
clear_btn.click(self.clear_function,[state],[]) | |
clear_btn.click(lambda: None, None, chatbot, queue=False) | |
upload_button.upload(self.upload_file, upload_button, [file_output,state]) | |
demo.queue().launch(debug=True) | |
if __name__=="__main__": | |
chatbot = Chatbot() | |
chatbot.gradio_interface() |