Spaces:
Sleeping
Sleeping
from flask import Flask, request, jsonify, Response, stream_with_context | |
import torch | |
import shutil | |
import os | |
import sys | |
from argparse import ArgumentParser | |
from time import strftime | |
from argparse import Namespace | |
from src.utils.preprocess import CropAndExtract | |
from src.test_audio2coeff import Audio2Coeff | |
from src.facerender.animate import AnimateFromCoeff | |
from src.generate_batch import get_data | |
from src.generate_facerender_batch import get_facerender_data | |
# from src.utils.init_path import init_path | |
import tempfile | |
from openai import OpenAI, AsyncOpenAI | |
import threading | |
import elevenlabs | |
from elevenlabs import set_api_key, generate, play, clone, Voice, VoiceSettings | |
# from flask_cors import CORS, cross_origin | |
# from flask_swagger_ui import get_swaggerui_blueprint | |
import uuid | |
import time | |
from PIL import Image | |
import moviepy.editor as mp | |
import requests | |
import json | |
import pickle | |
from celery import Celery | |
# from gevent import monkey | |
# monkey.patch_all() | |
import torch.multiprocessing as t | |
import multiprocessing | |
multiprocessing.set_start_method('spawn', force=True) | |
class AnimationConfig: | |
def __init__(self, driven_audio_path, source_image_path, result_folder,pose_style,expression_scale,enhancer,still,preprocess,ref_pose_video_path, image_hardcoded): | |
self.driven_audio = driven_audio_path | |
self.source_image = source_image_path | |
self.ref_eyeblink = None | |
self.ref_pose = ref_pose_video_path | |
self.checkpoint_dir = './checkpoints' | |
self.result_dir = result_folder | |
self.pose_style = pose_style | |
self.batch_size = 2 | |
self.expression_scale = expression_scale | |
self.input_yaw = None | |
self.input_pitch = None | |
self.input_roll = None | |
self.enhancer = enhancer | |
self.background_enhancer = None | |
self.cpu = False | |
self.face3dvis = False | |
self.still = still | |
self.preprocess = preprocess | |
self.verbose = False | |
self.old_version = False | |
self.net_recon = 'resnet50' | |
self.init_path = None | |
self.use_last_fc = False | |
self.bfm_folder = './checkpoints/BFM_Fitting/' | |
self.bfm_model = 'BFM_model_front.mat' | |
self.focal = 1015. | |
self.center = 112. | |
self.camera_d = 10. | |
self.z_near = 5. | |
self.z_far = 15. | |
self.device = 'cuda' | |
self.image_hardcoded = image_hardcoded | |
app = Flask(__name__) | |
# CORS(app) | |
app.config['broker_url'] = 'redis://localhost:6379/0' | |
app.config['result_backend'] = 'redis://localhost:6379/0' | |
celery = Celery(app.name, broker=app.config['broker_url']) | |
celery.conf.update(app.config) | |
TEMP_DIR = None | |
start_time = None | |
chunk_tasks = [] | |
app.config['temp_response'] = None | |
app.config['generation_thread'] = None | |
app.config['text_prompt'] = None | |
app.config['final_video_path'] = None | |
app.config['final_video_duration'] = None | |
def main(args): | |
print("Entered main function") | |
pic_path = args.source_image | |
audio_path = args.driven_audio | |
save_dir = args.result_dir | |
pose_style = args.pose_style | |
device = args.device | |
batch_size = args.batch_size | |
input_yaw_list = args.input_yaw | |
input_pitch_list = args.input_pitch | |
input_roll_list = args.input_roll | |
ref_eyeblink = args.ref_eyeblink | |
ref_pose = args.ref_pose | |
preprocess = args.preprocess | |
image_hardcoded = args.image_hardcoded | |
dir_path = os.path.dirname(os.path.realpath(__file__)) | |
current_root_path = dir_path | |
print('current_root_path ',current_root_path) | |
# sadtalker_paths = init_path(args.checkpoint_dir, os.path.join(current_root_path, 'src/config'), args.size, args.old_version, args.preprocess) | |
path_of_lm_croper = os.path.join(current_root_path, args.checkpoint_dir, 'shape_predictor_68_face_landmarks.dat') | |
path_of_net_recon_model = os.path.join(current_root_path, args.checkpoint_dir, 'epoch_20.pth') | |
dir_of_BFM_fitting = os.path.join(current_root_path, args.checkpoint_dir, 'BFM_Fitting') | |
wav2lip_checkpoint = os.path.join(current_root_path, args.checkpoint_dir, 'wav2lip.pth') | |
audio2pose_checkpoint = os.path.join(current_root_path, args.checkpoint_dir, 'auido2pose_00140-model.pth') | |
audio2pose_yaml_path = os.path.join(current_root_path, 'src', 'config', 'auido2pose.yaml') | |
audio2exp_checkpoint = os.path.join(current_root_path, args.checkpoint_dir, 'auido2exp_00300-model.pth') | |
audio2exp_yaml_path = os.path.join(current_root_path, 'src', 'config', 'auido2exp.yaml') | |
free_view_checkpoint = os.path.join(current_root_path, args.checkpoint_dir, 'facevid2vid_00189-model.pth.tar') | |
if preprocess == 'full': | |
mapping_checkpoint = os.path.join(current_root_path, args.checkpoint_dir, 'mapping_00109-model.pth.tar') | |
facerender_yaml_path = os.path.join(current_root_path, 'src', 'config', 'facerender_still.yaml') | |
else: | |
mapping_checkpoint = os.path.join(current_root_path, args.checkpoint_dir, 'mapping_00229-model.pth.tar') | |
facerender_yaml_path = os.path.join(current_root_path, 'src', 'config', 'facerender.yaml') | |
# preprocess_model = CropAndExtract(sadtalker_paths, device) | |
#init model | |
print(path_of_net_recon_model) | |
preprocess_model = CropAndExtract(path_of_lm_croper, path_of_net_recon_model, dir_of_BFM_fitting, device) | |
# audio_to_coeff = Audio2Coeff(sadtalker_paths, device) | |
audio_to_coeff = Audio2Coeff(audio2pose_checkpoint, audio2pose_yaml_path, | |
audio2exp_checkpoint, audio2exp_yaml_path, | |
wav2lip_checkpoint, device) | |
# animate_from_coeff = AnimateFromCoeff(sadtalker_paths, device) | |
animate_from_coeff = AnimateFromCoeff(free_view_checkpoint, mapping_checkpoint, | |
facerender_yaml_path, device) | |
first_frame_dir = os.path.join(save_dir, 'first_frame_dir') | |
os.makedirs(first_frame_dir, exist_ok=True) | |
# first_coeff_path, crop_pic_path, crop_info = preprocess_model.generate(pic_path, first_frame_dir, args.preprocess,\ | |
# source_image_flag=True, pic_size=args.size) | |
# fixed_temp_dir = "/tmp/preprocess_data" | |
# os.makedirs(fixed_temp_dir, exist_ok=True) | |
# preprocessed_data_path = os.path.join(fixed_temp_dir, "preprocessed_data.pkl") | |
# if os.path.exists(preprocessed_data_path) and image_hardcoded == "yes": | |
# print("Loading preprocessed data...") | |
# with open(preprocessed_data_path, "rb") as f: | |
# preprocessed_data = pickle.load(f) | |
# first_coeff_new_path = preprocessed_data["first_coeff_path"] | |
# crop_pic_new_path = preprocessed_data["crop_pic_path"] | |
# crop_info_path = preprocessed_data["crop_info_path"] | |
# with open(crop_info_path, "rb") as f: | |
# crop_info = pickle.load(f) | |
# print(f"Loaded existing preprocessed data from: {preprocessed_data_path}") | |
# else: | |
# print("Running preprocessing...") | |
# first_coeff_path, crop_pic_path, crop_info = preprocess_model.generate(pic_path, first_frame_dir, args.preprocess, source_image_flag=True) | |
# first_coeff_new_path = os.path.join(fixed_temp_dir, os.path.basename(first_coeff_path)) | |
# crop_pic_new_path = os.path.join(fixed_temp_dir, os.path.basename(crop_pic_path)) | |
# crop_info_new_path = os.path.join(fixed_temp_dir, "crop_info.pkl") | |
# shutil.move(first_coeff_path, first_coeff_new_path) | |
# shutil.move(crop_pic_path, crop_pic_new_path) | |
# with open(crop_info_new_path, "wb") as f: | |
# pickle.dump(crop_info, f) | |
# preprocessed_data = {"first_coeff_path": first_coeff_new_path, | |
# "crop_pic_path": crop_pic_new_path, | |
# "crop_info_path": crop_info_new_path} | |
# with open(preprocessed_data_path, "wb") as f: | |
# pickle.dump(preprocessed_data, f) | |
# print(f"Preprocessed data saved to: {preprocessed_data_path}") | |
first_coeff_path, crop_pic_path, crop_info = preprocess_model.generate(pic_path, first_frame_dir, args.preprocess, source_image_flag=True) | |
print('first_coeff_path ',first_coeff_path) | |
print('crop_pic_path ',crop_pic_path) | |
print('crop_info ',crop_info) | |
if first_coeff_path is None: | |
print("Can't get the coeffs of the input") | |
return | |
if ref_eyeblink is not None: | |
ref_eyeblink_videoname = os.path.splitext(os.path.split(ref_eyeblink)[-1])[0] | |
ref_eyeblink_frame_dir = os.path.join(save_dir, ref_eyeblink_videoname) | |
os.makedirs(ref_eyeblink_frame_dir, exist_ok=True) | |
# ref_eyeblink_coeff_path, _, _ = preprocess_model.generate(ref_eyeblink, ref_eyeblink_frame_dir, args.preprocess, source_image_flag=False) | |
ref_eyeblink_coeff_path, _, _ = preprocess_model.generate(ref_eyeblink, ref_eyeblink_frame_dir) | |
else: | |
ref_eyeblink_coeff_path=None | |
print('ref_eyeblink_coeff_path',ref_eyeblink_coeff_path) | |
if ref_pose is not None: | |
if ref_pose == ref_eyeblink: | |
ref_pose_coeff_path = ref_eyeblink_coeff_path | |
else: | |
ref_pose_videoname = os.path.splitext(os.path.split(ref_pose)[-1])[0] | |
ref_pose_frame_dir = os.path.join(save_dir, ref_pose_videoname) | |
os.makedirs(ref_pose_frame_dir, exist_ok=True) | |
# ref_pose_coeff_path, _, _ = preprocess_model.generate(ref_pose, ref_pose_frame_dir, args.preprocess, source_image_flag=False) | |
ref_pose_coeff_path, _, _ = preprocess_model.generate(ref_pose, ref_pose_frame_dir) | |
else: | |
ref_pose_coeff_path=None | |
print('ref_eyeblink_coeff_path',ref_pose_coeff_path) | |
batch = get_data(first_coeff_path, audio_path, device, ref_eyeblink_coeff_path, still=args.still) | |
coeff_path = audio_to_coeff.generate(batch, save_dir, pose_style, ref_pose_coeff_path) | |
if args.face3dvis: | |
from src.face3d.visualize import gen_composed_video | |
gen_composed_video(args, device, first_coeff_path, coeff_path, audio_path, os.path.join(save_dir, '3dface.mp4')) | |
# data = get_facerender_data(coeff_path, crop_pic_path, first_coeff_path, audio_path, | |
# batch_size, input_yaw_list, input_pitch_list, input_roll_list, | |
# expression_scale=args.expression_scale, still_mode=args.still, preprocess=args.preprocess, size=args.size) | |
data = get_facerender_data(coeff_path, crop_pic_path, first_coeff_path, audio_path, | |
batch_size, input_yaw_list, input_pitch_list, input_roll_list, | |
expression_scale=args.expression_scale, still_mode=args.still, preprocess=args.preprocess) | |
# result, base64_video,temp_file_path= animate_from_coeff.generate(data, save_dir, pic_path, crop_info, \ | |
# enhancer=args.enhancer, background_enhancer=args.background_enhancer, preprocess=args.preprocess, img_size=args.size) | |
multiprocessing.set_start_method('spawn', force=True) | |
result, base64_video,temp_file_path,new_audio_path = animate_from_coeff.generate(data, save_dir, pic_path, crop_info, \ | |
enhancer=args.enhancer, background_enhancer=args.background_enhancer, preprocess=args.preprocess) | |
video_clip = mp.VideoFileClip(temp_file_path) | |
duration = video_clip.duration | |
app.config['temp_response'] = base64_video | |
app.config['final_video_path'] = temp_file_path | |
app.config['final_video_duration'] = duration | |
return base64_video, temp_file_path, duration | |
def create_temp_dir(): | |
return tempfile.TemporaryDirectory() | |
def save_uploaded_file(file, filename,TEMP_DIR): | |
print("Entered save_uploaded_file") | |
unique_filename = str(uuid.uuid4()) + "_" + filename | |
file_path = os.path.join(TEMP_DIR.name, unique_filename) | |
file.save(file_path) | |
return file_path | |
# client = OpenAI(api_key=os.getenv('OPENAI_API_KEY')) | |
# def openai_chat_avatar(text_prompt): | |
# response = client.chat.completions.create( | |
# model="gpt-4o-mini", | |
# messages=[{"role": "system", "content": "Answer using the minimum words you can ever use."}, | |
# {"role": "user", "content": f"Hi! I need help with something. Can you assist me with the following: {text_prompt}"}, | |
# ], | |
# max_tokens = len(text_prompt) + 300 # Use the length of the input text | |
# # temperature=0.3, | |
# # stop=["Translate:", "Text:"] | |
# ) | |
# return response | |
def ryzedb_chat_avatar(question): | |
url = "https://inference.dev.ryzeai.ai/chat/stream" | |
question = question + ". Summarize and Answer using the minimum words you can ever use." | |
payload = json.dumps({ | |
"input": { | |
"chat_history": [], | |
"app_id": os.getenv('RYZE_APP_ID'), | |
"question": question | |
}, | |
"config": {} | |
}) | |
headers = { | |
'Content-Type': 'application/json' | |
} | |
try: | |
# Send the POST request | |
response = requests.request("POST", url, headers=headers, data=payload) | |
# Check for successful request | |
response.raise_for_status() | |
# Return the response JSON | |
return response.text | |
except requests.exceptions.RequestException as e: | |
print(f"An error occurred: {e}") | |
return None | |
def custom_cleanup(temp_dir, exclude_dir): | |
# Iterate over the files and directories in TEMP_DIR | |
for filename in os.listdir(temp_dir): | |
file_path = os.path.join(temp_dir, filename) | |
# Skip the directory we want to exclude | |
if file_path != exclude_dir: | |
try: | |
if os.path.isdir(file_path): | |
shutil.rmtree(file_path) | |
else: | |
os.remove(file_path) | |
print(f"Deleted: {file_path}") | |
except Exception as e: | |
print(f"Failed to delete {file_path}. Reason: {e}") | |
def generate_audio(voice_cloning, voice_gender, text_prompt): | |
print("generate_audio") | |
if voice_cloning == 'no': | |
if voice_gender == 'male': | |
voice = 'echo' | |
print('Entering Audio creation using elevenlabs') | |
set_api_key("92e149985ea2732b4359c74346c3daee") | |
audio = generate(text = text_prompt, voice = "Daniel", model = "eleven_multilingual_v2",stream=True, latency=4) | |
with tempfile.NamedTemporaryFile(suffix=".mp3", prefix="text_to_speech_",dir=TEMP_DIR.name, delete=False) as temp_file: | |
for chunk in audio: | |
temp_file.write(chunk) | |
driven_audio_path = temp_file.name | |
print('driven_audio_path',driven_audio_path) | |
print('Audio file saved using elevenlabs') | |
else: | |
voice = 'nova' | |
print('Entering Audio creation using whisper') | |
response = client.audio.speech.create(model="tts-1-hd", | |
voice=voice, | |
input = text_prompt) | |
print('Audio created using whisper') | |
with tempfile.NamedTemporaryFile(suffix=".wav", prefix="text_to_speech_",dir=TEMP_DIR.name, delete=False) as temp_file: | |
driven_audio_path = temp_file.name | |
response.write_to_file(driven_audio_path) | |
print('Audio file saved using whisper') | |
elif voice_cloning == 'yes': | |
set_api_key("92e149985ea2732b4359c74346c3daee") | |
# voice = clone(name = "User Cloned Voice", | |
# files = [user_voice_path] ) | |
voice = Voice(voice_id="CEii8R8RxmB0zhAiloZg",name="Marc",settings=VoiceSettings( | |
stability=0.71, similarity_boost=0.5, style=0.0, use_speaker_boost=True),) | |
audio = generate(text = text_prompt, voice = voice, model = "eleven_multilingual_v2",stream=True, latency=4) | |
with tempfile.NamedTemporaryFile(suffix=".mp3", prefix="cloned_audio_",dir=TEMP_DIR.name, delete=False) as temp_file: | |
for chunk in audio: | |
temp_file.write(chunk) | |
driven_audio_path = temp_file.name | |
print('driven_audio_path',driven_audio_path) | |
return driven_audio_path | |
def split_audio(audio_path, chunk_duration=5): | |
audio_clip = mp.AudioFileClip(audio_path) | |
total_duration = audio_clip.duration | |
audio_chunks = [] | |
for start_time in range(0, int(total_duration), chunk_duration): | |
end_time = min(start_time + chunk_duration, total_duration) | |
chunk = audio_clip.subclip(start_time, end_time) | |
with tempfile.NamedTemporaryFile(suffix=f"_chunk_{start_time}-{end_time}.wav", prefix="audio_chunk_", dir=TEMP_DIR.name, delete=False) as temp_file: | |
chunk_path = temp_file.name | |
chunk.write_audiofile(chunk_path) | |
audio_chunks.append(chunk_path) | |
return audio_chunks | |
def process_video_for_chunk(audio_chunk_path, args_dict, chunk_index): | |
print("Entered process_video_for_chunk") | |
args = AnimationConfig( | |
driven_audio_path=args_dict['driven_audio_path'], | |
source_image_path=args_dict['source_image_path'], | |
result_folder=args_dict['result_folder'], | |
pose_style=args_dict['pose_style'], | |
expression_scale=args_dict['expression_scale'], | |
enhancer=args_dict['enhancer'], | |
still=args_dict['still'], | |
preprocess=args_dict['preprocess'], | |
ref_pose_video_path=args_dict['ref_pose_video_path'], | |
image_hardcoded=args_dict['image_hardcoded'] | |
) | |
args.driven_audio = audio_chunk_path | |
chunk_save_dir = os.path.join(args.result_dir, f"chunk_{chunk_index}") | |
os.makedirs(chunk_save_dir, exist_ok=True) | |
print("args",args) | |
try: | |
base64_video, video_chunk_path, duration = main(args) | |
print(f"Main function returned: {video_chunk_path}, {duration}") | |
return video_chunk_path | |
except Exception as e: | |
print(f"Error in process_video_for_chunk: {str(e)}") | |
raise | |
# base64_video, video_chunk_path, duration = main(args) | |
# return video_chunk_path | |
def generate_video(): | |
global start_time | |
global chunk_tasks | |
start_time = time.time() | |
global TEMP_DIR | |
TEMP_DIR = create_temp_dir() | |
print('request:',request.method) | |
try: | |
if request.method == 'POST': | |
# source_image = request.files['source_image'] | |
image_path = '/home/user/app/images/out.jpg' | |
source_image = Image.open(image_path) | |
text_prompt = request.form['text_prompt'] | |
print('Input text prompt: ',text_prompt) | |
text_prompt = text_prompt.strip() | |
if not text_prompt: | |
return jsonify({'error': 'Input text prompt cannot be blank'}), 400 | |
voice_cloning = request.form.get('voice_cloning', 'yes') | |
image_hardcoded = request.form.get('image_hardcoded', 'yes') | |
chat_model_used = request.form.get('chat_model_used', 'openai') | |
target_language = request.form.get('target_language', 'original_text') | |
print('target_language',target_language) | |
pose_style = int(request.form.get('pose_style', 1)) | |
expression_scale = float(request.form.get('expression_scale', 1)) | |
enhancer = request.form.get('enhancer', None) | |
voice_gender = request.form.get('voice_gender', 'male') | |
still_str = request.form.get('still', 'False') | |
still = still_str.lower() == 'false' | |
print('still', still) | |
preprocess = request.form.get('preprocess', 'crop') | |
print('preprocess selected: ',preprocess) | |
ref_pose_video = request.files.get('ref_pose', None) | |
if chat_model_used == 'ryzedb': | |
response = ryzedb_chat_avatar(text_prompt) | |
events = response.split('\r\n\r\n') | |
content = None | |
for event in events: | |
# Split each event block by "\r\n" to get the lines | |
lines = event.split('\r\n') | |
if len(lines) > 1 and lines[0] == 'event: data': | |
# Extract the JSON part from the second line and parse it | |
json_data = lines[1].replace('data: ', '') | |
try: | |
data = json.loads(json_data) | |
text_prompt = data.get('content') | |
app.config['text_prompt'] = text_prompt | |
print('Final output text prompt using ryzedb: ',text_prompt) | |
break # Exit the loop once content is found | |
except json.JSONDecodeError: | |
continue | |
else: | |
# response = openai_chat_avatar(text_prompt) | |
# text_prompt = response.choices[0].message.content.strip() | |
app.config['text_prompt'] = text_prompt | |
print('Final output text prompt using openai: ',text_prompt) | |
source_image_path = save_uploaded_file(source_image, 'source_image.png',TEMP_DIR) | |
print(source_image_path) | |
driven_audio_path = generate_audio(voice_cloning, voice_gender, text_prompt) | |
chunk_duration = 5 | |
print(f"Splitting the audio into {chunk_duration}-second chunks...") | |
audio_chunks = split_audio(driven_audio_path, chunk_duration=chunk_duration) | |
print(f"Audio has been split into {len(audio_chunks)} chunks: {audio_chunks}") | |
save_dir = tempfile.mkdtemp(dir=TEMP_DIR.name) | |
result_folder = os.path.join(save_dir, "results") | |
os.makedirs(result_folder, exist_ok=True) | |
ref_pose_video_path = None | |
if ref_pose_video: | |
with tempfile.NamedTemporaryFile(suffix=".mp4", prefix="ref_pose_",dir=TEMP_DIR.name, delete=False) as temp_file: | |
ref_pose_video_path = temp_file.name | |
ref_pose_video.save(ref_pose_video_path) | |
print('ref_pose_video_path',ref_pose_video_path) | |
except Exception as e: | |
app.logger.error(f"An error occurred: {e}") | |
return "An error occurred", 500 | |
# args = AnimationConfig(driven_audio_path=driven_audio_path, source_image_path=source_image_path, result_folder=result_folder, pose_style=pose_style, expression_scale=expression_scale,enhancer=enhancer,still=still,preprocess=preprocess,ref_pose_video_path=ref_pose_video_path, image_hardcoded=image_hardcoded) | |
args_dict = { | |
'driven_audio_path': driven_audio_path, | |
'source_image_path': source_image_path, | |
'result_folder': result_folder, | |
'pose_style': pose_style, | |
'expression_scale': expression_scale, | |
'enhancer': enhancer, | |
'still': still, | |
'preprocess': preprocess, | |
'ref_pose_video_path': ref_pose_video_path, | |
'image_hardcoded': image_hardcoded, | |
'device': 'cuda' if torch.cuda.is_available() else 'cpu'} | |
# if torch.cuda.is_available() and not args.cpu: | |
# args.device = "cuda" | |
# else: | |
# args.device = "cpu" | |
print("audio_chunks:",audio_chunks) | |
try: | |
for index, audio_chunk in enumerate(audio_chunks): | |
print(f"Submitting chunk {index} with audio_chunk: {audio_chunk}") | |
task = process_video_for_chunk.apply_async(args=[audio_chunk, args_dict, index]) | |
print(f"Task {task.id} submitted for chunk {index}") | |
chunk_tasks.append(task) | |
print("chunk_tasks",chunk_tasks) | |
return jsonify({'status': 'Video generation started'}), 200 | |
except Exception as e: | |
return jsonify({'status': 'error', 'message': str(e)}), 500 | |
def stream_video_chunks(): | |
global chunk_tasks | |
print("chunk_tasks:",chunk_tasks) | |
def generate_chunks(): | |
video_chunk_paths = [] | |
unfinished_tasks = chunk_tasks[:] | |
while unfinished_tasks: # Keep running until all tasks are finished | |
for task in unfinished_tasks[:]: # Iterate over a copy of the list | |
if task.ready(): # Check if the task is finished | |
try: | |
video_chunk_path = task.get() # Get the result (chunk path) | |
video_chunk_paths.append(video_chunk_path) | |
yield f'data: {video_chunk_path}\n\n' # Stream the chunk path to frontend | |
app.logger.info(f"Chunk generated and sent: {video_chunk_path}") | |
os.remove(video_chunk_path) # Optionally delete the chunk after sending | |
unfinished_tasks.remove(task) # Remove the finished task | |
except Exception as e: | |
app.logger.error(f"Error while fetching task result: {str(e)}") | |
yield f'data: error\n\n' | |
time.sleep(1) # Avoid busy waiting, check every second | |
preprocess_dir = os.path.join("/tmp", "preprocess_data") | |
custom_cleanup(TEMP_DIR.name, preprocess_dir) | |
app.logger.info("Temporary files cleaned up, but preprocess_data is retained.") | |
# Return the generator that streams the data as it becomes available | |
return Response(generate_chunks(), content_type='text/event-stream') | |
def health_status(): | |
response = {"online": "true"} | |
return jsonify(response) | |
if __name__ == '__main__': | |
app.run(debug=True) | |