Spaces:
Runtime error
Runtime error
"""Run codes.""" | |
# pylint: disable=line-too-long, broad-exception-caught, invalid-name, missing-function-docstring, too-many-instance-attributes, missing-class-docstring | |
# ruff: noqa: E501 | |
import os | |
import platform | |
import random | |
import time | |
from dataclasses import asdict, dataclass | |
from pathlib import Path | |
# from types import SimpleNamespace | |
import gradio as gr | |
import psutil | |
from about_time import about_time | |
from ctransformers import AutoModelForCausalLM | |
from dl_hf_model import dl_hf_model | |
from loguru import logger | |
URL = "https://huggingface.co/TheBloke/Wizard-Vicuna-7B-Uncensored-GGML/raw/main/Wizard-Vicuna-7B-Uncensored.ggmlv3.q4_K_M.bin" # 4.05G | |
url = "https://huggingface.co/savvamadar/ggml-gpt4all-j-v1.3-groovy/blob/main/ggml-gpt4all-j-v1.3-groovy.bin" | |
url = "https://huggingface.co/TheBloke/Llama-2-13B-GGML/blob/main/llama-2-13b.ggmlv3.q4_K_S.bin" # 7.37G | |
# url = "https://huggingface.co/TheBloke/Llama-2-13B-chat-GGML/blob/main/llama-2-13b-chat.ggmlv3.q3_K_L.bin" | |
url = "https://huggingface.co/TheBloke/Llama-2-13B-chat-GGML/blob/main/llama-2-13b-chat.ggmlv3.q3_K_L.bin" # 6.93G | |
# url = "https://huggingface.co/TheBloke/Llama-2-13B-chat-GGML/blob/main/llama-2-13b-chat.ggmlv3.q3_K_L.binhttps://huggingface.co/TheBloke/Llama-2-13B-chat-GGML/blob/main/llama-2-13b-chat.ggmlv3.q4_K_M.bin" # 7.87G | |
url = "https://huggingface.co/localmodels/Llama-2-13B-Chat-ggml/blob/main/llama-2-13b-chat.ggmlv3.q4_K_S.bin" # 7.37G | |
_ = ( | |
"golay" in platform.node() | |
or "okteto" in platform.node() | |
or Path("/kaggle").exists() | |
# or psutil.cpu_count(logical=False) < 4 | |
or 1 # run 7b in hf | |
) | |
if _: | |
# url = "https://huggingface.co/TheBloke/Llama-2-13B-chat-GGML/blob/main/llama-2-13b-chat.ggmlv3.q2_K.bin" | |
url = "https://huggingface.co/TheBloke/Llama-2-7B-Chat-GGML/blob/main/llama-2-7b-chat.ggmlv3.q2_K.bin" # 2.87G | |
url = "https://huggingface.co/TheBloke/Llama-2-7B-Chat-GGML/blob/main/llama-2-7b-chat.ggmlv3.q4_K_M.bin" # 2.87G | |
url = "https://huggingface.co/TheBloke/llama2_7b_chat_uncensored-GGML/blob/main/llama2_7b_chat_uncensored.ggmlv3.q4_K_M.bin" # 4.08G | |
prompt_template = """### HUMAN: | |
{question} | |
### RESPONSE:""" | |
_ = [elm for elm in prompt_template.splitlines() if elm.strip()] | |
stop_string = [elm.split(":")[0] + ":" for elm in _][-2] | |
logger.debug(f"{stop_string=} not used") | |
_ = psutil.cpu_count(logical=False) - 1 | |
cpu_count: int = int(_) if _ else 1 | |
logger.debug(f"{cpu_count=}") | |
LLM = None | |
try: | |
model_loc, file_size = dl_hf_model(url) | |
except Exception as exc_: | |
logger.error(exc_) | |
raise SystemExit(1) from exc_ | |
LLM = AutoModelForCausalLM.from_pretrained( | |
model_loc, | |
model_type="llama", | |
# threads=cpu_count, | |
) | |
logger.info(f"done load llm {model_loc=} {file_size=}G") | |
os.environ["TZ"] = "Asia/Shanghai" | |
try: | |
time.tzset() # type: ignore # pylint: disable=no-member | |
except Exception: | |
# Windows | |
logger.warning("Windows, cant run time.tzset()") | |
_ = """ | |
ns = SimpleNamespace( | |
response="", | |
generator=(_ for _ in []), | |
) | |
# """ | |
class GenerationConfig: | |
temperature: float = 0.7 | |
top_k: int = 50 | |
top_p: float = 0.9 | |
repetition_penalty: float = 1.0 | |
max_new_tokens: int = 512 | |
seed: int = 42 | |
reset: bool = False | |
stream: bool = True | |
threads: int = cpu_count | |
# stop: list[str] = field(default_factory=lambda: [stop_string]) | |
def generate( | |
question: str, | |
llm=LLM, | |
config: GenerationConfig = GenerationConfig(), | |
): | |
"""Run model inference, will return a Generator if streaming is true.""" | |
# _ = prompt_template.format(question=question) | |
# print(_) | |
prompt = prompt_template.format(question=question) | |
print("\n [PROMPT]: " ,prompt) | |
return llm( | |
prompt, | |
**asdict(config), | |
) | |
logger.debug(f"{asdict(GenerationConfig())=}") | |
def user(user_message, history): | |
# return user_message, history + [[user_message, None]] | |
history.append([user_message, None]) | |
return user_message, history # keep user_message | |
def user1(user_message, history): | |
# return user_message, history + [[user_message, None]] | |
history.append([user_message, None]) | |
return "", history # clear user_message | |
def updateprompt(ptemp): | |
print("[Changed prompt tempt] ", ptemp) | |
prompt_template = ptemp | |
def bot(history): | |
user_message = history[-1][0] | |
response = [] | |
logger.debug(f"{user_message=}") | |
with about_time() as atime: # type: ignore | |
flag = 1 | |
prefix = "" | |
then = time.time() | |
logger.debug("about to generate") | |
config = GenerationConfig(reset=True) | |
for elm in generate(user_message, config=config): | |
if flag == 1: | |
logger.debug("in the loop") | |
prefix = f"({time.time() - then:.2f}s) " | |
flag = 0 | |
print(prefix, end="", flush=True) | |
logger.debug(f"{prefix=}") | |
print(elm, end="", flush=True) | |
# logger.debug(f"{elm}") | |
response.append(elm) | |
history[-1][1] = prefix + "".join(response) | |
yield history | |
_ = ( | |
f"(time elapsed: {atime.duration_human}, " # type: ignore | |
f"{atime.duration/len(''.join(response)):.2f}s/char)" # type: ignore | |
) | |
history[-1][1] = "".join(response) + f"\n{_}" | |
yield history | |
def predict_api(prompt): | |
logger.debug(f"{prompt=}") | |
try: | |
# user_prompt = prompt | |
config = GenerationConfig( | |
temperature=0.2, | |
top_k=10, | |
top_p=0.9, | |
repetition_penalty=1.0, | |
max_new_tokens=512, # adjust as needed | |
seed=42, | |
reset=True, # reset history (cache) | |
stream=False, | |
# threads=cpu_count, | |
# stop=prompt_prefix[1:2], | |
) | |
response = generate( | |
prompt, | |
config=config, | |
) | |
logger.debug(f"api: {response=}") | |
except Exception as exc: | |
logger.error(exc) | |
response = f"{exc=}" | |
# bot = {"inputs": [response]} | |
# bot = [(prompt, response)] | |
return response | |
logger.info("start block") | |
with gr.Blocks( | |
title=f"{Path(model_loc).name}", | |
) as block: | |
chatbot = gr.Chatbot(height=500) | |
with gr.Row(): | |
with gr.Column(scale=5): | |
msg = gr.Textbox( | |
label="Chat Message Box", | |
placeholder="Ask me anything (press Shift+Enter or click Submit to send)", | |
show_label=False, | |
# container=False, | |
lines=6, | |
max_lines=30, | |
show_copy_button=True, | |
# ).style(container=False) | |
) | |
with gr.Column(scale=1, min_width=50): | |
with gr.Row(): | |
submit = gr.Button("Submit", elem_classes="xsmall") | |
stop = gr.Button("Stop", visible=True) | |
clear = gr.Button("Clear History", visible=True) | |
with gr.Row(visible=True): | |
with gr.Accordion("Advanced Options:", open=False): | |
with gr.Row(): | |
with gr.Column(scale=2): | |
system = gr.Textbox( | |
label="System Prompt", | |
placeholder=prompt_template, | |
show_label=False, | |
# container=False, | |
lines=6, | |
max_lines=30, | |
# ).style(container=False) | |
) | |
with gr.Column(): | |
with gr.Row(): | |
change = gr.Button("Change System Prompt") | |
reset = gr.Button("Reset System Prompt") | |
msg_submit_event = msg.submit( | |
# fn=conversation.user_turn, | |
fn=user, | |
inputs=[msg, chatbot], | |
outputs=[msg, chatbot], | |
queue=True, | |
show_progress="full", | |
# api_name=None, | |
).then(bot, chatbot, chatbot, queue=True) | |
submit_click_event = submit.click( | |
# fn=lambda x, y: ("",) + user(x, y)[1:], # clear msg | |
fn=user1, # clear msg | |
inputs=[msg, chatbot], | |
outputs=[msg, chatbot], | |
queue=True, | |
# queue=False, | |
show_progress="full", | |
# api_name=None, | |
).then(bot, chatbot, chatbot, queue=True) | |
stop.click( | |
fn=None, | |
inputs=None, | |
outputs=None, | |
cancels=[msg_submit_event, submit_click_event], | |
queue=False, | |
) | |
change.click( | |
fn=None, | |
inputs=None, | |
outputs=None, | |
queue=False, | |
).then(updateprompt, chatbot, chatbot, queue=True) | |
clear.click(lambda: None, None, chatbot, queue=False) | |
with gr.Accordion("For Chat/Translation API", open=False, visible=False): | |
input_text = gr.Text() | |
api_btn = gr.Button("Go", variant="primary") | |
out_text = gr.Text() | |
api_btn.click( | |
predict_api, | |
input_text, | |
out_text, | |
api_name="api", | |
) | |
# concurrency_count=5, max_size=20 | |
# max_size=36, concurrency_count=14 | |
# CPU cpu_count=2 16G, model 7G | |
# CPU UPGRADE cpu_count=8 32G, model 7G | |
concurrency_count = 1 | |
logger.info(f"{concurrency_count=}") | |
block.queue(concurrency_count=concurrency_count, max_size=5).launch(debug=True) | |