Spaces:
Running
Running
import time | |
import gradio as gr | |
from transformers import pipeline | |
import numpy as np | |
from openai import OpenAI | |
import threading | |
import queue | |
transcriber = pipeline("automatic-speech-recognition", model="openai/whisper-base.en") | |
qa_model = pipeline("question-answering", model="distilbert-base-cased-distilled-squad") | |
class PubSub: | |
def __init__(self): | |
self.subscribers = [] | |
def subscribe(self, callback): | |
self.subscribers.append(callback) | |
def publish(self, message): | |
for subscriber in self.subscribers: | |
subscriber(message) | |
def predict(message, history, api_key, is_paused, pubsub): | |
def run_prediction(): | |
client = OpenAI(api_key=api_key) | |
history_openai_format = [] | |
for human, assistant in history: | |
history_openai_format.append({"role": "user", "content": human}) | |
history_openai_format.append({"role": "assistant", "content": assistant}) | |
history_openai_format.append({"role": "user", "content": message}) | |
response = client.chat.completions.create( | |
model='gpt-4o', | |
messages=history_openai_format, | |
temperature=1.0, | |
stream=True | |
) | |
partial_message = "" | |
for chunk in response: | |
if is_paused[0]: | |
while is_paused[0]: | |
time.sleep(0.1) | |
if chunk.choices[0].delta.content: | |
partial_message += chunk.choices[0].delta.content | |
pubsub.publish(partial_message) | |
thread = threading.Thread(target=run_prediction) | |
thread.start() | |
def chat_with_api_key(api_key, message, history, is_paused): | |
pubsub = PubSub() | |
result_queue = queue.Queue() | |
def update_message(partial_message): | |
result_queue.put(partial_message) | |
pubsub.subscribe(update_message) | |
predict(message, history, api_key, is_paused, pubsub) | |
while True: | |
try: | |
accumulated_message = result_queue.get(timeout=0.1) | |
history.append((message, accumulated_message)) | |
yield message, [[message, accumulated_message]] | |
except queue.Empty: | |
if not any(thread.is_alive() for thread in threading.enumerate() if thread != threading.current_thread()): | |
break | |
def transcribe(audio): | |
if audio is None: | |
return "No audio recorded." | |
sr, y = audio | |
y = y.astype(np.float32) | |
y /= np.max(np.abs(y)) | |
return transcriber({"sampling_rate": sr, "raw": y})["text"] | |
def answer(transcription): | |
context = "You are a chatbot answering general questions" | |
result = qa_model(question=transcription, context=context) | |
return result['answer'] | |
def process_audio(audio): | |
if audio is None: | |
return "No audio recorded.", [] | |
transcription = transcribe(audio) | |
answer_result = answer(transcription) | |
return transcription, [[transcription, answer_result]] | |
def update_output(api_key, audio_input, state, is_paused): | |
if is_paused[0]: | |
yield "", state | |
else: | |
message = transcribe(audio_input) | |
responses = chat_with_api_key(api_key, message, state, is_paused) | |
for response, updated_state in responses: | |
if is_paused[0]: | |
break | |
yield response, updated_state | |
def clear_all(): | |
return None, "", [] | |
def toggle_pause(is_paused): | |
is_paused[0] = not is_paused[0] | |
return is_paused | |
def update_button_label(is_paused): | |
return "Resume" if is_paused[0] else "Pause" | |
with gr.Blocks() as demo: | |
gr.HTML(""" | |
<script> | |
function ensureScrollable() { | |
var chatbox = document.querySelector('.chatbot'); | |
if (chatbox) { | |
chatbox.style.overflowY = 'auto'; | |
chatbox.style.maxHeight = '300px'; | |
} | |
} | |
function scrollToBottom() { | |
var chatbox = document.querySelector('.chatbot'); | |
if (chatbox) { | |
chatbox.scrollTop = chatbox.scrollHeight; | |
} | |
} | |
function setupScrolling() { | |
ensureScrollable(); | |
setInterval(scrollToBottom, 100); | |
} | |
if (document.readyState === 'loading') { | |
document.addEventListener('DOMContentLoaded', setupScrolling); | |
} else { | |
setupScrolling(); | |
} | |
</script> | |
""") | |
answer_output = gr.Chatbot(label="Answer Result", height=300) | |
with gr.Row(): | |
audio_input = gr.Audio(label="Audio Input", sources=["microphone"], type="numpy") | |
with gr.Column(): | |
api_key = gr.Textbox(label="API Key", placeholder="Enter your API key", type="password") | |
transcription_output = gr.Textbox(label="Transcription") | |
clear_button = gr.Button("Clear") | |
pause_button = gr.Button("Pause") | |
state = gr.State([]) | |
is_paused = gr.State([False]) | |
audio_input.stop_recording( | |
fn=update_output, | |
inputs=[api_key, audio_input, state, is_paused], | |
outputs=[transcription_output, answer_output] | |
) | |
clear_button.click( | |
fn=clear_all, | |
inputs=[], | |
outputs=[audio_input, transcription_output, answer_output] | |
) | |
pause_button.click( | |
fn=toggle_pause, | |
inputs=[is_paused], | |
outputs=[is_paused] | |
).then( | |
fn=update_button_label, | |
inputs=[is_paused], | |
outputs=[pause_button] | |
) | |
demo.launch() |