ai-movie-video / app.py
weepakistan's picture
Update app.py
d6725a0 verified
import os
import tempfile
import shutil
import gradio as gr
import requests
import subprocess
import json
import yt_dlp
from PIL import Image
from io import BytesIO
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Get API key and base URL from environment variables
API_KEY = os.getenv('Key')
BASE_URL = os.getenv('Base')
def get_movie_or_tv_details(title):
search_url = f'{BASE_URL}/search/multi'
params = {
'api_key': API_KEY,
'query': title
}
response = requests.get(search_url, params=params)
response.raise_for_status()
results = response.json().get('results')
if not results:
raise ValueError(f"No results found for title: {title}")
details = results[0]
if details['media_type'] == 'movie':
release_date = details.get('release_date', '')
elif details['media_type'] == 'tv':
release_date = details.get('first_air_date', '')
else:
raise ValueError("Unsupported media type")
release_year = release_date.split('-')[0] if release_date else 'Unknown'
return details, release_year
def get_images_url(media_type, media_id):
if media_type == 'movie':
images_url = f'{BASE_URL}/movie/{media_id}/images'
elif media_type == 'tv':
images_url = f'{BASE_URL}/tv/{media_id}/images'
else:
raise ValueError("Unsupported media type")
params = {'api_key': API_KEY}
response = requests.get(images_url, params=params)
response.raise_for_status()
return response.json()
def save_summary_and_images(title, summary, images, media_type, year):
prefix = 'Movie' if media_type == 'movie' else 'TV Show'
if not os.path.exists('images'):
os.makedirs('images')
with open(f"{prefix} {title} {year}.txt", 'w', encoding='utf-8') as file:
file.write(summary)
for idx, image in enumerate(images[:12]):
image_url = f"https://image.tmdb.org/t/p/original{image['file_path']}"
response = requests.get(image_url)
response.raise_for_status()
img = Image.open(BytesIO(response.content))
img = img.resize((1080, 1920), Image.LANCZOS)
img.save(f"images/{prefix} {title} {idx + 1} ({year}).jpg")
def generate_voiceover(text, output_file):
edge_tts_command = [
"edge-tts",
"--text", text,
"--write-media", output_file
]
try:
subprocess.run(edge_tts_command, check=True)
print(f"Generated voiceover: {output_file}")
except subprocess.CalledProcessError as e:
print(f"Failed to generate voiceover: {e}")
def process_text_files(prefix, suffix):
current_dir = os.getcwd()
txt_files = [file for file in os.listdir(current_dir) if file.endswith('.txt')]
for txt_file in txt_files:
file_path = os.path.join(current_dir, txt_file)
file_name = os.path.splitext(txt_file)[0]
with open(file_path, 'r', encoding='utf-8') as file:
text = file.read().strip()
processed_text = f"{prefix} {file_name}: {text} {suffix}"
audio_file_path = f"{file_name}.mp3"
generate_voiceover(processed_text, audio_file_path)
print(f"Processed File: {txt_file}")
print("Exported Audio:", audio_file_path)
print()
def make_slideshow(images_folder, audio_file):
image_files = sorted([img for img in os.listdir(images_folder) if img.endswith((".jpg", ".png"))])
images = [os.path.join(images_folder, img) for img in image_files]
audio_duration = float(subprocess.run(['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', audio_file], stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout)
commands = []
for image in images:
commands.append(f"file '{image}'")
commands.append(f"duration {audio_duration / len(images)}")
with open('filelist.txt', 'w') as filelist:
filelist.write("\n".join(commands))
base_name = os.path.splitext(os.path.basename(audio_file))[0]
output_file = f'{base_name}.mp4'
subprocess.run(['ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', 'filelist.txt', '-i', audio_file, '-c:v', 'libx264', '-pix_fmt', 'yuv420p', output_file])
os.remove('filelist.txt')
def download_music(search_query):
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
'outtmpl': 'background_music.%(ext)s'
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
ydl.download([f"ytsearch1:{search_query}"])
return "background_music.mp3"
except:
print("Error downloading music. Please check your internet connection or try a different search query.")
return None
def trim_audio(audio_path, target_duration):
output_path = "trimmed_background_music.mp3"
cmd = ['ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_format', '-show_streams', audio_path]
result = subprocess.run(cmd, capture_output=True, text=True)
data = json.loads(result.stdout)
audio_duration = float(data['format']['duration'])
if audio_duration > target_duration:
start_time = (audio_duration - target_duration) / 2
cmd = ['ffmpeg', '-i', audio_path, '-ss', str(start_time), '-t', str(target_duration), '-c', 'copy', output_path]
subprocess.run(cmd)
return output_path
else:
return audio_path
def combine_audio_with_video(video_path, bg_music_path, bg_music_volume=0.3):
temp_output = "temp_output.mp4"
cmd = [
'ffmpeg',
'-i', video_path,
'-i', bg_music_path,
'-filter_complex',
f'[1:a]volume={bg_music_volume}[bg];[0:a][bg]amix=inputs=2:duration=longest',
'-c:v', 'copy',
'-c:a', 'aac',
'-b:a', '192k',
temp_output
]
subprocess.run(cmd)
os.remove(video_path)
os.rename(temp_output, video_path)
def main(title):
# Create sessions directory if it doesn't exist
sessions_dir = "sessions"
os.makedirs(sessions_dir, exist_ok=True)
# Create session directory
session_dir = os.path.join(sessions_dir, next(tempfile._get_candidate_names()))
os.makedirs(session_dir, exist_ok=True)
os.chdir(session_dir)
try:
details, release_year = get_movie_or_tv_details(title)
media_type = details['media_type']
media_id = details['id']
summary = details.get('overview', 'No summary available.')
images_data = get_images_url(media_type, media_id)
images = images_data.get('backdrops', [])
save_summary_and_images(title, summary, images, media_type, release_year)
prefix_text = "Here is the summary of the"
suffix_text = "Have you watched it? Do tell us in the comments section."
process_text_files(prefix_text, suffix_text)
audio_file = next((f for f in os.listdir('.') if f.lower().endswith('.mp3')), None)
if audio_file:
make_slideshow('images', audio_file)
output_video_file = None
mp4_files = [f for f in os.listdir() if f.endswith('.mp4')]
for video_file in mp4_files:
video_duration = float(subprocess.run(['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', video_file], stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout)
search_query = f"{os.path.splitext(video_file)[0]} background music"
music_file = download_music(search_query)
if music_file:
trimmed_music = trim_audio(music_file, video_duration)
combine_audio_with_video(video_file, trimmed_music)
if os.path.exists(music_file):
os.remove(music_file)
if os.path.exists(trimmed_music) and trimmed_music != music_file:
os.remove(trimmed_music)
output_video_file = video_file
# Move the final video to the main directory
if output_video_file:
final_video_path = os.path.join("..", output_video_file)
shutil.move(output_video_file, final_video_path)
return gr.File(final_video_path)
else:
return "Error creating video."
except Exception as e:
return f"An error occurred: {e}"
finally:
# Ensure session_dir exists before deleting
if os.path.exists(session_dir):
shutil.rmtree(session_dir)
with gr.Blocks() as iface:
gr.Interface(
fn=main,
inputs=gr.Textbox(label="Enter the movie or TV show title:"),
outputs=gr.Video(label="Output Video"),
title="Movie/TV Show Video Summary Generator",
description="Enter a movie or TV show title to generate a summary video with voiceover and background music.<br><h2>Make sure to support me USDT (TRC-20): TAe7hsSVWtMEYz3G5V1UiUdYPQVqm28bKx</h2>"
)
iface.launch()