Spaces:
Sleeping
Sleeping
import os | |
import re | |
import struct | |
import binascii | |
import datetime | |
import csv | |
import json | |
import requests | |
from transformers import AutoTokenizer, AutoModel | |
import torch | |
from torch import Tensor | |
import torch.nn.functional as F | |
import numpy as np | |
from scipy.spatial.distance import cdist | |
from duckduckgo_search import DDGS | |
from bs4 import BeautifulSoup | |
model_name = "intfloat/multilingual-e5-large" | |
input_dir = 'input' | |
vectors_dir = 'vectors' | |
model = None | |
tokenizer = None | |
device = None | |
vectors = {} | |
os.makedirs(input_dir, exist_ok=True) | |
os.makedirs(vectors_dir, exist_ok=True) | |
def ddg(text, max_results = 5): | |
with DDGS() as ddgs: | |
results = [r for r in ddgs.text(text, max_results=max_results)] | |
print(results) | |
return results | |
def bs4(url): | |
html = requests.get(url).text | |
soup = BeautifulSoup(html, features="html.parser") | |
# kill all script and style elements | |
for script in soup(["script", "style"]): | |
script.extract() # rip it out | |
# get text | |
text = soup.get_text() | |
# break into lines and remove leading and trailing space on each | |
lines = (line.strip() for line in text.splitlines()) | |
# break multi-headlines into a line each | |
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
# drop blank lines | |
text = '\n'.join(chunk for chunk in chunks if chunk) | |
return text | |
def upload(name, filename, content): | |
os.makedirs(f"{input_dir}/{name}", exist_ok=True) | |
srcpath = f"{input_dir}/{name}/{filename}" | |
with open(srcpath, 'w', encoding='utf-8') as f: | |
f.write(content) | |
def delete(name, filename): | |
srcpath = f"{input_dir}/{name}/{filename}" | |
dstpath = f"{vectors_dir}/{name}/{filename}" | |
if os.path.exists(srcpath): | |
os.unlink(srcpath) | |
if os.path.exists(dstpath): | |
os.unlink(dstpath) | |
def load_model(): | |
global model, tokenizer, device | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
# CUDAが利用可能かチェックし、利用可能であればデバイスをCUDAに設定 | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
print(f"Using device: {device}") | |
# モデルをデバイスに移動 | |
model = AutoModel.from_pretrained(model_name).to(device) | |
def average_pool(last_hidden_states: Tensor, attention_mask: Tensor) -> Tensor: | |
last_hidden = last_hidden_states.masked_fill(~attention_mask[..., None].bool(), 0.0) | |
return last_hidden.sum(dim=1) / attention_mask.sum(dim=1)[..., None] | |
def cosine_similarity(v1, v2): | |
return 1 - cdist([v1], [v2], 'cosine')[0][0] | |
def embedding(): | |
for name in os.listdir(input_dir): | |
os.makedirs(f"{input_dir}/{name}", exist_ok=True) | |
os.makedirs(f"{vectors_dir}/{name}", exist_ok=True) | |
for filename in os.listdir(f"{input_dir}/{name}"): | |
embedding_file(name, filename) | |
def embedding_file(name, filename): | |
srcpath = f"{input_dir}/{name}/{filename}" | |
dstpath = f"{vectors_dir}/{name}/{filename}" | |
if os.path.isdir(srcpath): | |
return | |
if os.path.exists(dstpath): | |
return | |
print(srcpath) | |
chunks = [] | |
with open(srcpath, 'r', encoding='utf-8') as csv_file: | |
reader = csv.reader(csv_file) | |
for r in reader: | |
if not r: | |
continue | |
if r[0] == 'chunk': # header | |
continue | |
if len(r) == 1: | |
r.append('') | |
chunks.append(r) | |
# CSVファイルを開き、書き込みます | |
with open(dstpath, mode='w', encoding='utf-8', newline='') as csv_file: | |
fieldnames = ['chunk', 'output', 'vector'] | |
writer = csv.DictWriter(csv_file, fieldnames=fieldnames) | |
writer.writeheader() | |
for r in chunks: | |
writer.writerow({'chunk': r[0], 'output': r[1], 'vector': get_vector_string(r[0])}) | |
def get_vector_string(chunk): | |
global model, tokenizer, device | |
inputs = tokenizer(chunk, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device) | |
with torch.no_grad(): # 勾配計算を不要にする | |
outputs = model(**inputs) | |
embeddings = average_pool(outputs.last_hidden_state, inputs['attention_mask']) | |
embeddings = F.normalize(embeddings, p=2, dim=1) | |
vector_string = ",".join([hex(struct.unpack('>Q', struct.pack('>d', x))[0])[2:-7] for x in embeddings[0].cpu().numpy()]) # ベクトルを文字列に変換 | |
return vector_string | |
def load_vectors(): | |
global vectors | |
vectors = {} | |
for name in os.listdir(vectors_dir): | |
vectors[name] = [] | |
for filename in os.listdir(f"{vectors_dir}/{name}"): | |
filepath = f"{vectors_dir}/{name}/{filename}" | |
with open(filepath, mode='r', encoding='utf-8') as csv_file: | |
reader = csv.DictReader(csv_file) | |
for row in reader: | |
vector = np.array([struct.unpack('>d', binascii.unhexlify(x+'0000000'))[0] for x in row['vector'].split(',')]) | |
vectors[name].append([row['chunk'], row['output'], vector]) | |
def search(name, query_text, num = 3): | |
dt = datetime.datetime.now() | |
# クエリテキストをエンベディング | |
inputs = tokenizer(query_text, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device) | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
query_embeddings = average_pool(outputs.last_hidden_state, inputs['attention_mask']) | |
query_embeddings = F.normalize(query_embeddings, p=2, dim=1).cpu().numpy()[0] | |
# CSVファイルを読み込み、各レコードとクエリの類似度を計算 | |
similarities = [] | |
for row in vectors[name]: | |
similarity = cosine_similarity(query_embeddings, row[2]) | |
similarities.append((row, similarity)) | |
# 類似度でソートし、上位num件の結果を取得 | |
top_matches = sorted(similarities, key=lambda x: x[1], reverse=True)[:num] | |
result = '' | |
for i, (row, similarity) in enumerate(top_matches, 1): | |
if not row[1]: | |
row[1] = row[0] | |
result += f"#{i} {similarity*100:.2f}%\n{row[1]}\n\n" | |
print(result) | |
print(datetime.datetime.now() - dt) | |
return result | |
load_model() | |
load_vectors() | |
if __name__ == '__main__': | |
embedding() | |