Spaces:
Runtime error
Runtime error
# import whisper | |
from faster_whisper import WhisperModel | |
import datetime | |
import subprocess | |
import gradio as gr | |
from pathlib import Path | |
import pandas as pd | |
import re | |
import time | |
import os | |
import numpy as np | |
from sklearn.cluster import AgglomerativeClustering | |
from sklearn.metrics import silhouette_score | |
from pytube import YouTube | |
import yt_dlp | |
import torch | |
import pyannote.audio | |
from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding | |
from pyannote.audio import Audio | |
from pyannote.core import Segment | |
from gpuinfo import GPUInfo | |
import wave | |
import contextlib | |
from transformers import pipeline | |
import psutil | |
whisper_models = ["tiny", "base", "small", "medium", "large-v1", "large-v2"] | |
source_languages = { | |
"en": "Inglês", | |
"zh": "Chinês", | |
"de": "Alemão", | |
"es": "Espanhol", | |
"ru": "Russo", | |
"ko": "Coreano", | |
"fr": "Francês", | |
"ja": "Japonês", | |
"pt": "Português", | |
"tr": "Turco", | |
"pl": "Polaco", | |
"ca": "Catalão", | |
"nl": "Holandês", | |
"ar": "Árabe", | |
"sv": "Sueco", | |
"it": "Italiano", | |
"id": "Indonésio", | |
"hi": "Hindi", | |
"fi": "Finlandês", | |
"vi": "Vietnamita", | |
"he": "Hebraico", | |
"uk": "Ucraniano", | |
"el": "Grego", | |
"ms": "Malaio", | |
"cs": "Checo", | |
"ro": "Romeno", | |
"da": "Dinamarquês", | |
"hu": "Húngaro", | |
"ta": "Tâmil", | |
"no": "Norueguês", | |
"th": "Tailandês", | |
"ur": "Urdu", | |
"hr": "Croata", | |
"bg": "Búlgaro", | |
"lt": "Lituano", | |
"la": "Latim", | |
"mi": "Maori", | |
"ml": "Malaiala", | |
"cy": "Galês", | |
"sk": "Eslovaco", | |
"te": "Telugu", | |
"fa": "Persa", | |
"lv": "Letão", | |
"bn": "Bengali", | |
"sr": "Sérvio", | |
"az": "Azerbaijano", | |
"sl": "Esloveno", | |
"kn": "Canarim", | |
"et": "Estoniano", | |
"mk": "Macedónio", | |
"br": "Bretão", | |
"eu": "Basco", | |
"is": "Islandês", | |
"hy": "Arménio", | |
"ne": "Nepalês", | |
"mn": "Mongol", | |
"bs": "Bósnio", | |
"kk": "Cazaque", | |
"sq": "Albanês", | |
"sw": "Suaíli", | |
"gl": "Galego", | |
"mr": "Marata", | |
"pa": "Punjabi", | |
"si": "Cingalês", | |
"km": "Khmer", | |
"sn": "Shona", | |
"yo": "Ioruba", | |
"so": "Somali", | |
"af": "Africâner", | |
"oc": "Occitano", | |
"ka": "Georgiano", | |
"be": "Bielorrusso", | |
"tg": "Tajique", | |
"sd": "Sindi", | |
"gu": "Guzerate", | |
"am": "Amárico", | |
"yi": "Iídiche", | |
"lo": "Laosiano", | |
"uz": "Usbeque", | |
"fo": "Feroês", | |
"ht": "Crioulo Haitiano", | |
"ps": "Pashto", | |
"tk": "Turcomano", | |
"nn":"Nynorsk", | |
"mt": "Maltês", | |
"sa": "Sânscrito", | |
"lb": "Luxemburguês", | |
"my": "Birmanês", | |
"bo": "Tibetano", | |
"tl": "Tagalog", | |
"mg": "Malgaxe", | |
"as": "Assamês", | |
"tt": "Tártaro", | |
"haw": "Havaiano", | |
"ln": "Lingala", | |
"ha": "Hausa", | |
"ba": "Bashkir", | |
"jw": "Javanês", | |
"su": "Sundanês", | |
} | |
source_language_list = [key[0] for key in source_languages.items()] | |
MODEL_NAME = "vumichien/whisper-medium-jp" | |
lang = "pt" | |
device = 0 if torch.cuda.is_available() else "cpu" | |
pipe = pipeline( | |
task="automatic-speech-recognition", | |
model=MODEL_NAME, | |
chunk_length_s=30, | |
device=device, | |
) | |
os.makedirs('output', exist_ok=True) | |
pipe.model.config.forced_decoder_ids = pipe.tokenizer.get_decoder_prompt_ids(language=lang, task="transcribe") | |
embedding_model = PretrainedSpeakerEmbedding( | |
"speechbrain/spkrec-ecapa-voxceleb", | |
device=torch.device("cuda" if torch.cuda.is_available() else "cpu")) | |
def transcribe(microphone, file_upload): | |
warn_output = "" | |
if (microphone is not None) and (file_upload is not None): | |
warn_output = ( | |
"AVISO: Você enviou um arquivo de áudio e usou o microfone. " | |
"O arquivo gravado pelo microfone será usado e o áudio enviado será descartado.\n" | |
) | |
elif (microphone is None) and (file_upload is None): | |
return "ERRO: Você precisa usar o microfone ou fazer upload de um arquivo de áudio" | |
file = microphone if microphone is not None else file_upload | |
text = pipe(file)["text"] | |
return warn_output + text | |
def _return_yt_html_embed(yt_url): | |
video_id = yt_url.split("?v=")[-1] | |
HTML_str = ( | |
f'<center> <iframe width="500" height="320" src="https://www.youtube.com/embed/{video_id}"> </iframe>' | |
" </center>" | |
) | |
return HTML_str | |
def yt_transcribe(yt_url): | |
# yt = YouTube(yt_url) | |
# html_embed_str = _return_yt_html_embed(yt_url) | |
# stream = yt.streams.filter(only_audio=True)[0] | |
# stream.download(filename="audio.mp3") | |
ydl_opts = { | |
'format': 'bestvideo*+bestaudio/best', | |
'postprocessors': [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'mp3', | |
'preferredquality': '192', | |
}], | |
'outtmpl':'audio.%(ext)s', | |
} | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
ydl.download([yt_url]) | |
text = pipe("audio.mp3")["text"] | |
return html_embed_str, text | |
def convert_time(secs): | |
return datetime.timedelta(seconds=round(secs)) | |
def get_youtube(video_url): | |
# yt = YouTube(video_url) | |
# abs_video_path = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first().download() | |
ydl_opts = { | |
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', | |
} | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
info = ydl.extract_info(video_url, download=False) | |
abs_video_path = ydl.prepare_filename(info) | |
ydl.process_info(info) | |
print("Sucesso ao baixar o vídeo") | |
print(abs_video_path) | |
return abs_video_path | |
def speech_to_text(video_file_path, selected_source_lang, whisper_model, num_speakers): | |
""" | |
# Transcreva o link do youtube usando OpenAI Whisper | |
NOTA: Este modelo foi adaptado por Pedro Faria, para exemplo para a Biometrid, não deve ser usado para outros fins. | |
1. Usando o modelo Whisper da Open AI para separar áudio em segmentos e gerar transcrições. | |
2. Gerando embeddings de alto-falante para cada segmento. | |
3. Aplicando clustering aglomerativo nos embeddings para identificar o falante de cada segmento. | |
O reconhecimento de fala é baseado em modelos do OpenAI Whisper https://github.com/openai/whisper | |
Speaker diarization model and pipeline from by https://github.com/pyannote/pyannote-audio | |
Modelo de diarização de alto-falante e pipeline desenvolvido por https://github.com/pyannote/pyannote-audio | |
""" | |
# model = whisper.load_model(whisper_model) | |
# model = WhisperModel(whisper_model, device="cuda", compute_type="int8_float16") | |
model = WhisperModel(whisper_model, compute_type="int8") | |
time_start = time.time() | |
if(video_file_path == None): | |
raise ValueError("Error no video input") | |
print(video_file_path) | |
try: | |
# Read and convert youtube video | |
_,file_ending = os.path.splitext(f'{video_file_path}') | |
print(f'file enging is {file_ending}') | |
audio_file = video_file_path.replace(file_ending, ".wav") | |
print("A iniciar a conversão para WAV") | |
os.system(f'ffmpeg -i "{video_file_path}" -ar 16000 -ac 1 -c:a pcm_s16le "{audio_file}"') | |
# Get duration | |
with contextlib.closing(wave.open(audio_file,'r')) as f: | |
frames = f.getnframes() | |
rate = f.getframerate() | |
duration = frames / float(rate) | |
print(f"Conversão para WAV concluída, duração do arquivo de áudio.: {duration}") | |
# Transcribe audio | |
options = dict(language=selected_source_lang, beam_size=5, best_of=5) | |
transcribe_options = dict(task="transcribe", **options) | |
segments_raw, info = model.transcribe(audio_file, **transcribe_options) | |
# Convert back to original openai format | |
segments = [] | |
i = 0 | |
for segment_chunk in segments_raw: | |
chunk = {} | |
chunk["start"] = segment_chunk.start | |
chunk["end"] = segment_chunk.end | |
chunk["text"] = segment_chunk.text | |
segments.append(chunk) | |
i += 1 | |
print("transcrição de audio com fast whisper terminada") | |
except Exception as e: | |
raise RuntimeError("Erro a converter o filme para audio") | |
try: | |
# Create embedding | |
def segment_embedding(segment): | |
audio = Audio() | |
start = segment["start"] | |
# Whisper overshoots the end timestamp in the last segment | |
end = min(duration, segment["end"]) | |
clip = Segment(start, end) | |
waveform, sample_rate = audio.crop(audio_file, clip) | |
return embedding_model(waveform[None]) | |
embeddings = np.zeros(shape=(len(segments), 192)) | |
for i, segment in enumerate(segments): | |
embeddings[i] = segment_embedding(segment) | |
embeddings = np.nan_to_num(embeddings) | |
print(f'Embedding shape: {embeddings.shape}') | |
if num_speakers == 0: | |
# Find the best number of speakers | |
score_num_speakers = {} | |
for num_speakers in range(2, 10+1): | |
clustering = AgglomerativeClustering(num_speakers).fit(embeddings) | |
score = silhouette_score(embeddings, clustering.labels_, metric='euclidean') | |
score_num_speakers[num_speakers] = score | |
best_num_speaker = max(score_num_speakers, key=lambda x:score_num_speakers[x]) | |
print(f"O número estimado de participantes: {best_num_speaker} com pontuação de {score_num_speakers[best_num_speaker]} ") | |
else: | |
best_num_speaker = num_speakers | |
# Assign speaker label | |
clustering = AgglomerativeClustering(best_num_speaker).fit(embeddings) | |
labels = clustering.labels_ | |
for i in range(len(segments)): | |
segments[i]["speaker"] = 'Participante ' + str(labels[i] + 1) | |
# Make output | |
objects = { | |
'Start' : [], | |
'End': [], | |
'Speaker': [], | |
'Text': [] | |
} | |
text = '' | |
for (i, segment) in enumerate(segments): | |
if i == 0 or segments[i - 1]["speaker"] != segment["speaker"]: | |
objects['Start'].append(str(convert_time(segment["start"]))) | |
objects['Speaker'].append(segment["speaker"]) | |
if i != 0: | |
objects['End'].append(str(convert_time(segments[i - 1]["end"]))) | |
objects['Text'].append(text) | |
text = '' | |
text += segment["text"] + ' ' | |
objects['End'].append(str(convert_time(segments[i - 1]["end"]))) | |
objects['Text'].append(text) | |
time_end = time.time() | |
time_diff = time_end - time_start | |
memory = psutil.virtual_memory() | |
gpu_utilization, gpu_memory = GPUInfo.gpu_usage() | |
gpu_utilization = gpu_utilization[0] if len(gpu_utilization) > 0 else 0 | |
gpu_memory = gpu_memory[0] if len(gpu_memory) > 0 else 0 | |
system_info = f""" | |
*Memoria: {memory.total / (1024 * 1024 * 1024):.2f}GB, utilizado: {memory.percent}%, disponivel: {memory.available / (1024 * 1024 * 1024):.2f}GB.* | |
*Tempo de processamento: {time_diff:.5} segundos.* | |
*Utilização de GPU: {gpu_utilization}%, Memoria de GPU: {gpu_memory}MiB.* | |
""" | |
save_path = "output/transcript_result.csv" | |
df_results = pd.DataFrame(objects) | |
df_results.to_csv(save_path, index=False, encoding="utf-8") | |
return df_results, system_info, save_path | |
except Exception as e: | |
raise RuntimeError("Erro a correr a inferência com um modelo local", e) | |
# ---- Gradio Layout ----- | |
# Inspiration from https://huggingface.co/spaces/RASMUS/Whisper-youtube-crosslingual-subtitles | |
video_in = gr.Video(label="Ficheiro Video", mirror_webcam=False) | |
youtube_url_in = gr.Textbox(label="Url Youtube", lines=1, interactive=True) | |
df_init = pd.DataFrame(columns=['Início', 'Fim', 'Participante', 'Texto']) | |
memory = psutil.virtual_memory() | |
selected_source_lang = gr.Dropdown(choices=source_language_list, type="value", value="pt", label="Linguagem detectada no vídeo", interactive=True) | |
selected_whisper_model = gr.Dropdown(choices=whisper_models, type="value", value="large-v2", label="Modelo Whisper selecionado", interactive=True) | |
number_speakers = gr.Number(precision=0, value=2, label="Insira o número de participantes para obter melhores resultados. Se o valor for 0, o modelo encontrará automaticamente a melhor quantidade.", interactive=True) | |
system_info = gr.Markdown(f"*Memoria: {memory.total / (1024 * 1024 * 1024):.2f}GB, utilizado: {memory.percent}%, disponível: {memory.available / (1024 * 1024 * 1024):.2f}GB*") | |
download_transcript = gr.File(label="Download transcript") | |
transcription_df = gr.DataFrame(value=df_init,label="Dataframe da transcrição", row_count=(0, "dynamic"), max_rows = 10, wrap=True, overflow_row_behaviour='paginate') | |
title = "Whisper speaker diarization" | |
demo = gr.Blocks(title=title) | |
demo.encrypt = False | |
with demo: | |
with gr.Tab("Whisper speaker diarization"): | |
gr.Markdown(''' | |
<div> | |
<h1 style='text-align: center'>Whisper diarização com participantes</h1> | |
Este espaço usa os modelos whisper <a href='https://github.com/openai/whisper' target='_blank'><b>OpenAI</b></a> with <a href='https://github.com/guillaumekln/faster-whisper' target='_blank'><b>CTranslate2</b></a> which is a fast inference engine for Transformer models to recognize the speech (4 times faster than original openai model with same accuracy) | |
e o modelo ECAPA-TDNN de <a href='https://github.com/speechbrain/speechbrain' target='_blank'><b>SpeechBrain</b></a> para codificar e identificar participantes | |
</div> | |
''') | |
with gr.Row(): | |
gr.Markdown(''' | |
### Transcreva o link do youtube usando OpenAI Whisper | |
##### 1. Usando o modelo Whisper da Open AI para separar o áudio em segmentos e gerar transcrições. | |
##### 2. Gerando embeddings para cada segmento. | |
##### 3. Aplicando clustering aglomerativo nos embeddings para identificar o participante de cada segmento. | |
''') | |
with gr.Row(): | |
gr.Markdown(''' | |
### Pode testar com os seguintes exemplos: | |
''') | |
examples = gr.Examples(examples= | |
[ "https://youtu.be/mYT33lWKJyw", | |
"https://youtu.be/ctirgguI7RM"], | |
label="Examples", inputs=[youtube_url_in]) | |
with gr.Row(): | |
with gr.Column(): | |
youtube_url_in.render() | |
download_youtube_btn = gr.Button("Descarregar video do Youtube") | |
download_youtube_btn.click(get_youtube, [youtube_url_in], [ | |
video_in]) | |
print(video_in) | |
with gr.Row(): | |
with gr.Column(): | |
video_in.render() | |
with gr.Column(): | |
gr.Markdown(''' | |
##### Aqui você pode iniciar o processo de transcrição. | |
##### Por favor, selecione o idioma de origem para a transcrição. | |
##### Você pode selecionar uma faixa de números estimados de participantes. | |
''') | |
selected_source_lang.render() | |
selected_whisper_model.render() | |
number_speakers.render() | |
transcribe_btn = gr.Button("Transcrever audio com diarização") | |
transcribe_btn.click(speech_to_text, | |
[video_in, selected_source_lang, selected_whisper_model, number_speakers], | |
[transcription_df, system_info, download_transcript] | |
) | |
with gr.Row(): | |
gr.Markdown(''' | |
##### Aqui vai obter a transcrição | |
##### ''') | |
with gr.Row(): | |
with gr.Column(): | |
download_transcript.render() | |
transcription_df.render() | |
system_info.render() | |
demo.launch(debug=True) |