import os import re import struct import binascii import datetime import csv import json import requests from transformers import AutoTokenizer, AutoModel import torch from torch import Tensor import torch.nn.functional as F import numpy as np from scipy.spatial.distance import cdist from duckduckgo_search import DDGS from bs4 import BeautifulSoup model_name = "intfloat/multilingual-e5-large" input_dir = 'input' vectors_dir = 'vectors' model = None tokenizer = None device = None vectors = {} os.makedirs(input_dir, exist_ok=True) os.makedirs(vectors_dir, exist_ok=True) def ddg(text, max_results = 5): with DDGS() as ddgs: results = [r for r in ddgs.text(text, max_results=max_results)] print(results) return results def bs4(url): html = requests.get(url).text soup = BeautifulSoup(html, features="html.parser") # kill all script and style elements for script in soup(["script", "style"]): script.extract() # rip it out # get text text = soup.get_text() # break into lines and remove leading and trailing space on each lines = (line.strip() for line in text.splitlines()) # break multi-headlines into a line each chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) # drop blank lines text = '\n'.join(chunk for chunk in chunks if chunk) return text def upload(name, filename, content): os.makedirs(f"{input_dir}/{name}", exist_ok=True) srcpath = f"{input_dir}/{name}/{filename}" with open(srcpath, 'w', encoding='utf-8') as f: f.write(content) def delete(name, filename): srcpath = f"{input_dir}/{name}/{filename}" dstpath = f"{vectors_dir}/{name}/{filename}" if os.path.exists(srcpath): os.unlink(srcpath) if os.path.exists(dstpath): os.unlink(dstpath) def load_model(): global model, tokenizer, device tokenizer = AutoTokenizer.from_pretrained(model_name) # CUDAが利用可能かチェックし、利用可能であればデバイスをCUDAに設定 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {device}") # モデルをデバイスに移動 model = AutoModel.from_pretrained(model_name).to(device) def average_pool(last_hidden_states: Tensor, attention_mask: Tensor) -> Tensor: last_hidden = last_hidden_states.masked_fill(~attention_mask[..., None].bool(), 0.0) return last_hidden.sum(dim=1) / attention_mask.sum(dim=1)[..., None] def cosine_similarity(v1, v2): return 1 - cdist([v1], [v2], 'cosine')[0][0] def embedding(): for name in os.listdir(input_dir): os.makedirs(f"{input_dir}/{name}", exist_ok=True) os.makedirs(f"{vectors_dir}/{name}", exist_ok=True) for filename in os.listdir(f"{input_dir}/{name}"): embedding_file(name, filename) def embedding_file(name, filename): srcpath = f"{input_dir}/{name}/{filename}" dstpath = f"{vectors_dir}/{name}/{filename}" if os.path.isdir(srcpath): return if os.path.exists(dstpath): return print(srcpath) chunks = [] with open(srcpath, 'r', encoding='utf-8') as csv_file: reader = csv.reader(csv_file) for r in reader: if not r: continue if r[0] == 'chunk': # header continue if len(r) == 1: r.append('') chunks.append(r) # CSVファイルを開き、書き込みます with open(dstpath, mode='w', encoding='utf-8', newline='') as csv_file: fieldnames = ['chunk', 'output', 'vector'] writer = csv.DictWriter(csv_file, fieldnames=fieldnames) writer.writeheader() for r in chunks: writer.writerow({'chunk': r[0], 'output': r[1], 'vector': get_vector_string(r[0])}) def get_vector_string(chunk): global model, tokenizer, device inputs = tokenizer(chunk, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device) with torch.no_grad(): # 勾配計算を不要にする outputs = model(**inputs) embeddings = average_pool(outputs.last_hidden_state, inputs['attention_mask']) embeddings = F.normalize(embeddings, p=2, dim=1) vector_string = ",".join([hex(struct.unpack('>Q', struct.pack('>d', x))[0])[2:-7] for x in embeddings[0].cpu().numpy()]) # ベクトルを文字列に変換 return vector_string def load_vectors(): global vectors vectors = {} for name in os.listdir(vectors_dir): vectors[name] = [] for filename in os.listdir(f"{vectors_dir}/{name}"): filepath = f"{vectors_dir}/{name}/{filename}" with open(filepath, mode='r', encoding='utf-8') as csv_file: reader = csv.DictReader(csv_file) for row in reader: vector = np.array([struct.unpack('>d', binascii.unhexlify(x+'0000000'))[0] for x in row['vector'].split(',')]) vectors[name].append([row['chunk'], row['output'], vector]) def search(name, query_text, num = 3): dt = datetime.datetime.now() # クエリテキストをエンベディング inputs = tokenizer(query_text, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device) with torch.no_grad(): outputs = model(**inputs) query_embeddings = average_pool(outputs.last_hidden_state, inputs['attention_mask']) query_embeddings = F.normalize(query_embeddings, p=2, dim=1).cpu().numpy()[0] # CSVファイルを読み込み、各レコードとクエリの類似度を計算 similarities = [] for row in vectors[name]: similarity = cosine_similarity(query_embeddings, row[2]) similarities.append((row, similarity)) # 類似度でソートし、上位num件の結果を取得 top_matches = sorted(similarities, key=lambda x: x[1], reverse=True)[:num] result = '' for i, (row, similarity) in enumerate(top_matches, 1): if not row[1]: row[1] = row[0] result += f"#{i} {similarity*100:.2f}%\n{row[1]}\n\n" print(result) print(datetime.datetime.now() - dt) return result load_model() load_vectors() if __name__ == '__main__': embedding()