import datetime import uuid import openai from langchain.embeddings import OpenAIEmbeddings from langchain.vectorstores import Chroma from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.embeddings import SentenceTransformerEmbeddings import os from langchain.document_loaders import WebBaseLoader, TextLoader, Docx2txtLoader, PyMuPDFLoader from whatsapp_chat_custom import WhatsAppChatLoader # use this instead of from langchain.document_loaders import WhatsAppChatLoader from collections import deque import re from bs4 import BeautifulSoup import requests from urllib.parse import urlparse import mimetypes from pathlib import Path import tiktoken # Regex pattern to match a URL HTTP_URL_PATTERN = r'^http[s]*://.+' mimetypes.init() media_files = tuple([x for x in mimetypes.types_map if mimetypes.types_map[x].split('/')[0] in ['image', 'video', 'audio']]) filter_strings = ['/email-protection#'] def getOaiCreds(key): if key: return {'service': 'openai', 'oai_key' : key } else: return {} def getWxCreds(key, p_id): if key and p_id: return {'service': 'watsonx', 'credentials' : {"url": "https://us-south.ml.cloud.ibm.com", "apikey": key }, 'project_id': p_id } else: return {} def getPersonalBotApiKey(): if os.getenv("OPENAI_API_KEY"): return getOaiCreds(os.getenv("OPENAI_API_KEY")) elif os.getenv("WX_API_KEY") and os.getenv("WX_PROJECT_ID"): return getWxCreds(os.getenv("WX_API_KEY"), os.getenv("WX_PROJECT_ID")) else: return {} def get_hyperlinks(url): try: reqs = requests.get(url) if not reqs.headers.get('Content-Type').startswith("text/html") or 400<=reqs.status_code<600: return [] soup = BeautifulSoup(reqs.text, 'html.parser') except Exception as e: print(e) return [] hyperlinks = [] for link in soup.find_all('a', href=True): hyperlinks.append(link.get('href')) return hyperlinks # Function to get the hyperlinks from a URL that are within the same domain def get_domain_hyperlinks(local_domain, url): clean_links = [] for link in set(get_hyperlinks(url)): clean_link = None # If the link is a URL, check if it is within the same domain if re.search(HTTP_URL_PATTERN, link): # Parse the URL and check if the domain is the same url_obj = urlparse(link) if url_obj.netloc.replace('www.','') == local_domain.replace('www.',''): clean_link = link # If the link is not a URL, check if it is a relative link else: if link.startswith("/"): link = link[1:] elif link.startswith(("#", '?', 'mailto:')): continue if 'wp-content/uploads' in url: clean_link = url+ "/" + link else: clean_link = "https://" + local_domain + "/" + link if clean_link is not None: clean_link = clean_link.strip().rstrip('/').replace('/../', '/') if not any(x in clean_link for x in filter_strings): clean_links.append(clean_link) # Return the list of hyperlinks that are within the same domain return list(set(clean_links)) # this function will get you a list of all the URLs from the base URL def crawl(url, local_domain, prog=None): # Create a queue to store the URLs to crawl queue = deque([url]) # Create a set to store the URLs that have already been seen (no duplicates) seen = set([url]) # While the queue is not empty, continue crawling while queue: # Get the next URL from the queue url_pop = queue.pop() # Get the hyperlinks from the URL and add them to the queue for link in get_domain_hyperlinks(local_domain, url_pop): if link not in seen: queue.append(link) seen.add(link) if len(seen)>=100: return seen if prog is not None: prog(1, desc=f'Crawling: {url_pop}') return seen def ingestURL(documents, url, crawling=True, prog=None): url = url.rstrip('/') # Parse the URL and get the domain local_domain = urlparse(url).netloc if not (local_domain and url.startswith('http')): return documents print('Loading URL', url) if crawling: # crawl to get other webpages from this URL if prog is not None: prog(0, desc=f'Crawling: {url}') links = crawl(url, local_domain, prog) if prog is not None: prog(1, desc=f'Crawling: {url}') else: links = set([url]) # separate pdf and other links c_links, pdf_links = [], [] for x in links: if x.endswith('.pdf'): pdf_links.append(x) elif not x.endswith(media_files): c_links.append(x) # Clean links loader using WebBaseLoader if prog is not None: prog(0.5, desc=f'Ingesting: {url}') if c_links: loader = WebBaseLoader(list(c_links)) documents.extend(loader.load()) # remote PDFs loader for pdf_link in list(pdf_links): loader = PyMuPDFLoader(pdf_link) doc = loader.load() for x in doc: x.metadata['source'] = loader.source documents.extend(doc) return documents def ingestFiles(documents, files_list, prog=None): for fPath in files_list: doc = None if fPath.endswith('.pdf'): doc = PyMuPDFLoader(fPath).load() elif fPath.endswith('.txt') and not 'WhatsApp Chat with' in fPath: doc = TextLoader(fPath).load() elif fPath.endswith(('.doc', 'docx')): doc = Docx2txtLoader(fPath).load() elif 'WhatsApp Chat with' in fPath and fPath.endswith('.csv'): # Convert Whatsapp TXT files to CSV using https://whatstk.streamlit.app/ doc = WhatsAppChatLoader(fPath).load() else: pass if doc is not None and doc[0].page_content: if prog is not None: prog(1, desc='Loaded file: '+fPath.rsplit('/')[0]) print('Loaded file:', fPath) documents.extend(doc) return documents def data_ingestion(inputDir=None, file_list=[], url_list=[], prog=None): documents = [] # Ingestion from Input Directory if inputDir is not None: files = [str(x) for x in Path(inputDir).glob('**/*')] documents = ingestFiles(documents, files) if file_list: documents = ingestFiles(documents, file_list, prog) # Ingestion from URLs - also try https://python.langchain.com/docs/integrations/document_loaders/recursive_url_loader if url_list: for url in url_list: documents = ingestURL(documents, url, prog=prog) # Cleanup documents for x in documents: if 'WhatsApp Chat with' not in x.metadata['source']: x.page_content = x.page_content.strip().replace('\n', ' ').replace('\\n', ' ').replace(' ', ' ') # print(f"Total number of documents: {len(documents)}") return documents def split_docs(documents): # Splitting and Chunks text_splitter = RecursiveCharacterTextSplitter(chunk_size=2500, chunk_overlap=250) # default chunk size of 4000 makes around 1k tokens per doc. with k=4, this means 4k tokens input to LLM. docs = text_splitter.split_documents(documents) return docs def getSourcesFromMetadata(metadata, sourceOnly=True, sepFileUrl=True): # metadata: list of metadata dict from all documents setSrc = set() for x in metadata: metadataText = '' # we need to convert each metadata dict into a string format. This string will be added to a set if x is not None: # extract source first, and then extract all other items source = x['source'] source = source.rsplit('/',1)[-1] if 'http' not in source else source notSource = [] for k,v in x.items(): if v is not None and k!='source' and k in ['page', 'title']: notSource.extend([f"{k}: {v}"]) metadataText = ', '.join([f'source: {source}'] + notSource) if sourceOnly==False else source setSrc.add(metadataText) if sepFileUrl: src_files = '\n'.join(([f"{i+1}) {x}" for i,x in enumerate(sorted([x for x in setSrc if 'http' not in x], key=str.casefold))])) src_urls = '\n'.join(([f"{i+1}) {x}" for i,x in enumerate(sorted([x for x in setSrc if 'http' in x], key=str.casefold))])) src_files = 'Files:\n'+src_files if src_files else '' src_urls = 'URLs:\n'+src_urls if src_urls else '' newLineSep = '\n\n' if src_files and src_urls else '' return src_files + newLineSep + src_urls , len(setSrc) else: src_docs = '\n'.join(([f"{i+1}) {x}" for i,x in enumerate(sorted(list(setSrc), key=str.casefold))])) return src_docs, len(setSrc) def getEmbeddingFunc(creds): # OpenAI key used if creds.get('service')=='openai': embeddings = OpenAIEmbeddings(openai_api_key=creds.get('oai_key','Null')) # WX key used elif creds.get('service')=='watsonx': # testModel = Model(model_id=ModelTypes.FLAN_UL2, credentials=creds['credentials'], project_id=creds['project_id']) # test the API key # del testModel embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") # for now use OpenSource model for embedding as WX doesnt have any embedding model else: raise Exception('Error: Invalid or None Credentials') return embeddings def getVsDict(embeddingFunc, docs, vsDict={}): # create chroma client if doesnt exist if vsDict.get('chromaClient') is None: vsDict['chromaDir'] = './vecstore/'+str(uuid.uuid1()) vsDict['chromaClient'] = Chroma(embedding_function=embeddingFunc, persist_directory=vsDict['chromaDir']) # clear chroma client before adding new docs if vsDict['chromaClient']._collection.count()>0: vsDict['chromaClient'].delete(vsDict['chromaClient'].get()['ids']) # add new docs to chroma client vsDict['chromaClient'].add_documents(docs) print('vectorstore count:',vsDict['chromaClient']._collection.count(), 'at', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) return vsDict # used for Hardcoded documents only - not uploaded by user (userData_vecStore is separate function) def localData_vecStore(embKey={}, inputDir=None, file_list=[], url_list=[], vsDict={}): documents = data_ingestion(inputDir, file_list, url_list) if not documents: raise Exception('Error: No Documents Found') docs = split_docs(documents) # Embeddings embeddings = getEmbeddingFunc(embKey) # create chroma client if doesnt exist vsDict_hd = getVsDict(embeddings, docs, vsDict) # get sources from metadata src_str = getSourcesFromMetadata(vsDict_hd['chromaClient'].get()['metadatas']) src_str = str(src_str[1]) + ' source document(s) successfully loaded in vector store.'+'\n\n' + src_str[0] print(src_str) return vsDict_hd def num_tokens_from_string(string, encoding_name = "cl100k_base"): """Returns the number of tokens in a text string.""" encoding = tiktoken.get_encoding(encoding_name) num_tokens = len(encoding.encode(string)) return num_tokens