interview_copilot / app copy 9.py
alex buz
new
68ba2e8
import time
import gradio as gr
from transformers import pipeline
import numpy as np
from openai import OpenAI
import threading
import queue
transcriber = pipeline("automatic-speech-recognition", model="openai/whisper-base.en")
qa_model = pipeline("question-answering", model="distilbert-base-cased-distilled-squad")
class PubSub:
def __init__(self):
self.subscribers = []
def subscribe(self, callback):
self.subscribers.append(callback)
def publish(self, message):
for subscriber in self.subscribers:
subscriber(message)
def predict(message, history, api_key, is_paused, pubsub):
def run_prediction():
client = OpenAI(api_key=api_key)
history_openai_format = []
for human, assistant in history:
history_openai_format.append({"role": "user", "content": human})
history_openai_format.append({"role": "assistant", "content": assistant})
history_openai_format.append({"role": "user", "content": message})
response = client.chat.completions.create(
model='gpt-4o',
messages=history_openai_format,
temperature=1.0,
stream=True
)
partial_message = ""
for chunk in response:
if is_paused[0]:
while is_paused[0]:
time.sleep(0.1)
if chunk.choices[0].delta.content:
partial_message += chunk.choices[0].delta.content
pubsub.publish(partial_message)
thread = threading.Thread(target=run_prediction)
thread.start()
def chat_with_api_key(api_key, message, history, is_paused):
pubsub = PubSub()
result_queue = queue.Queue()
def update_message(partial_message):
result_queue.put(partial_message)
pubsub.subscribe(update_message)
predict(message, history, api_key, is_paused, pubsub)
while True:
try:
accumulated_message = result_queue.get(timeout=0.1)
history.append((message, accumulated_message))
yield message, [[message, accumulated_message]]
except queue.Empty:
if not any(thread.is_alive() for thread in threading.enumerate() if thread != threading.current_thread()):
break
def transcribe(audio):
if audio is None:
return "No audio recorded."
sr, y = audio
y = y.astype(np.float32)
y /= np.max(np.abs(y))
return transcriber({"sampling_rate": sr, "raw": y})["text"]
def answer(transcription):
context = "You are a chatbot answering general questions"
result = qa_model(question=transcription, context=context)
return result['answer']
def process_audio(audio):
if audio is None:
return "No audio recorded.", []
transcription = transcribe(audio)
answer_result = answer(transcription)
return transcription, [[transcription, answer_result]]
def update_output(api_key, audio_input, state, is_paused):
if is_paused[0]:
yield "", state
else:
message = transcribe(audio_input)
responses = chat_with_api_key(api_key, message, state, is_paused)
for response, updated_state in responses:
if is_paused[0]:
break
yield response, updated_state
def clear_all():
return None, "", []
def toggle_pause(is_paused):
is_paused[0] = not is_paused[0]
return is_paused
def update_button_label(is_paused):
return "Resume" if is_paused[0] else "Pause"
with gr.Blocks() as demo:
gr.HTML("""
<script>
function ensureScrollable() {
var chatbox = document.querySelector('.chatbot');
if (chatbox) {
chatbox.style.overflowY = 'auto';
chatbox.style.maxHeight = '300px';
}
}
function scrollToBottom() {
var chatbox = document.querySelector('.chatbot');
if (chatbox) {
chatbox.scrollTop = chatbox.scrollHeight;
}
}
function setupScrolling() {
ensureScrollable();
setInterval(scrollToBottom, 100);
}
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', setupScrolling);
} else {
setupScrolling();
}
</script>
""")
answer_output = gr.Chatbot(label="Answer Result", height=300)
with gr.Row():
audio_input = gr.Audio(label="Audio Input", sources=["microphone"], type="numpy")
with gr.Column():
api_key = gr.Textbox(label="API Key", placeholder="Enter your API key", type="password")
transcription_output = gr.Textbox(label="Transcription")
clear_button = gr.Button("Clear")
pause_button = gr.Button("Pause")
state = gr.State([])
is_paused = gr.State([False])
audio_input.stop_recording(
fn=update_output,
inputs=[api_key, audio_input, state, is_paused],
outputs=[transcription_output, answer_output]
)
clear_button.click(
fn=clear_all,
inputs=[],
outputs=[audio_input, transcription_output, answer_output]
)
pause_button.click(
fn=toggle_pause,
inputs=[is_paused],
outputs=[is_paused]
).then(
fn=update_button_label,
inputs=[is_paused],
outputs=[pause_button]
)
demo.launch()