Spaces:
Sleeping
Sleeping
import os | |
import json | |
import math | |
import re | |
import time | |
import traceback | |
from typing import Optional | |
import xml.etree.ElementTree as ElementTree | |
from html import unescape | |
from yt_dlp import YoutubeDL, DownloadError | |
from yt_dlp.networking import Request | |
from yt_dlp.utils import sanitize_filename, random_user_agent | |
NO_RETRY_STR = [ | |
"Sorry about that", | |
"unavailable", | |
"not available", | |
] | |
RETRY_STR = [ | |
"URLError", | |
"429", | |
"IncompleteRead", | |
"Remote end closed connection", | |
# "No video formats found", | |
] | |
debug = os.getenv("DEBUG") | |
# yt-dlp subtitle types: json3,srv1,srv2,srv3,ttml,vtt, xml(youtube url with out extargs) | |
# "subtitles": { | |
# "live_chat": [ | |
# { | |
# "url": "https://www.youtube.com/watch?v=ANtM2bHRz04&bpctr=9999999999&has_verified=1", | |
# "ext": "json", | |
# "video_id": "ANtM2bHRz04", | |
# "protocol": "youtube_live_chat_replay" | |
# } | |
# ] | |
# } | |
def getSubtitleOptions( | |
lang: Optional[str] = None, | |
proxy: Optional[str] = None, | |
): | |
ydl_opts = { | |
"noplaylist": True, | |
"writesubtitles": True, | |
"writeautomaticsub": True, | |
# "listsubtitles": True, | |
# "subtitlesformat": subType, # mark due to default youtube no srt and xml format | |
"skip_download": True, | |
"socket_timeout": 10, | |
"extractor_retries": 0, | |
# "debug_printtraffic": True, | |
"extractor_args": { | |
"youtube": { | |
"player_skip": [ | |
"configs", | |
"initial", | |
], # skip "webpage" will cause l2P5PgL1LfI missing some langs, | |
"player_client": ["ios"], | |
"skip": ["hls", "dash"], # don't skip "translated_subs" | |
} | |
}, | |
} | |
if lang: | |
ydl_opts.update( | |
{ | |
"subtitleslangs": [ | |
lang, | |
"-live_chat", | |
] | |
} | |
) # filter live chat to requested_subtitles | |
if proxy: | |
ydl_opts.update({"proxy": proxy, "socket_timeout": 20}) | |
print(ydl_opts) | |
return ydl_opts | |
def getUrlFromSubtitleItem(item, lang="en", subType="vtt"): | |
# print("item: {}, lang: {}, subType: {}".format(item, lang, subType)) | |
for subtitle in item[lang]: | |
if lang != "live_chat" and subType == "xml": | |
if debug: | |
print( | |
"subtitle source lang:{} url: {}".format(lang, subtitle.get("url")) | |
) | |
return subtitle.get("url").replace("&fmt=" + subtitle.get("ext"), "") | |
if subtitle.get("ext") == subType: | |
if debug: | |
print("subtitle lang:{} url: {}".format(lang, subtitle.get("url"))) | |
return subtitle.get("url") | |
return None | |
def getRequestedSubtitlesUrl(info_dict, lang, subType, isLangKey=False): | |
item = info_dict.get("requested_subtitles") | |
if not item: | |
return None | |
langs = item.keys() | |
if lang in langs: | |
item = {lang: [item[lang]]} if type(item[lang]) == dict else item | |
url = getUrlFromSubtitleItem(item, lang, subType) | |
if url: | |
if debug: | |
print("getRequestedSubtitlesUrl lang:{}".format(lang)) | |
return url | |
if not isLangKey: | |
for l in langs: | |
if l.startswith(lang): | |
item = {l: [item[l]]} if type(item[l]) == dict else item | |
url = getUrlFromSubtitleItem(item, l, subType) | |
if url: | |
if debug: | |
print("getRequestedSubtitlesUrl lang:{} url:{}".format(l, url)) | |
return url | |
return None | |
def getSubtitleLangUrl( | |
info_dict, | |
lang="en", | |
subType="vtt", | |
subTitleKeys=["subtitles", "automatic_captions"], | |
isLangKey=False, | |
): | |
for subtitle_item in subTitleKeys: | |
langs = info_dict.get(subtitle_item).keys() | |
if lang in langs: | |
url = getUrlFromSubtitleItem(info_dict.get(subtitle_item), lang, subType) | |
if url: | |
if debug: | |
print("getSubtitleLangUrl lang:{}".format(lang)) | |
return url | |
if not isLangKey: | |
for subtitle_item in subTitleKeys: | |
langs = info_dict.get(subtitle_item).keys() | |
for l in langs: | |
if l.startswith(lang): | |
url = getUrlFromSubtitleItem( | |
info_dict.get(subtitle_item), l, subType | |
) | |
if url: | |
if debug: | |
print("getSubtitleLangUrl lang:{} url:{}".format(l, url)) | |
return url | |
return None | |
def getSubtitleOtherUrl( | |
info_dict, | |
lang="en", | |
subType="vtt", | |
subTitleKeys=["subtitles", "automatic_captions"], | |
): | |
for subtitle_item in subTitleKeys: | |
langs = info_dict.get(subtitle_item).keys() | |
if len(langs) == 0: | |
continue | |
l = lang if lang in langs else ("en" if "en" in langs else list(langs)[0]) | |
if l is None: | |
continue | |
url = getUrlFromSubtitleItem(info_dict.get(subtitle_item), l, subType) | |
if url: | |
if debug: | |
print("getSubtitleOtherUrl lang:{} url:{}".format(l, url)) | |
return url | |
return None | |
async def fetchSubtitle( | |
url: str, | |
lang: Optional[str] = "en", | |
subType: Optional[str] = "vtt", | |
proxy: Optional[str] = None, | |
) -> dict: | |
return await fetchAnySubtitle(url, lang, subType, proxy) | |
async def fetchAnySubtitle( | |
url: str, | |
lang: Optional[str] = "en", | |
subType: Optional[str] = "vtt", | |
proxy: Optional[str] = None, | |
) -> dict: | |
# lang-code or lang.* .* is regex | |
# reqLang = lang if len(lang.split("-")) > 1 or lang.endswith(".*") else lang + ".*" | |
title = "unknow" | |
duration = "" | |
try: | |
ydl, info_dict = extractInfo(url, lang, proxy, False) | |
# print(json.dumps(info_dict)) | |
title = sanitize_filename(info_dict.get("title", "unknow")) | |
seconds = info_dict.get("duration") | |
duration = str(seconds) if seconds else "" | |
thumbnail = info_dict.get("thumbnail") | |
if ".webp" in thumbnail: | |
thumbnail = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format( | |
info_dict.get("id") | |
) | |
reqType = subType | |
if info_dict.get("extractor") == "youtube" and subType in ["srt", "txt"]: | |
reqType = "xml" | |
if debug: | |
print( | |
"subtitles.keys(): {} automatic_captions: {} requested_subtitles: {}".format( | |
info_dict.get("subtitles").keys(), | |
info_dict.get("automatic_captions").keys(), | |
( | |
info_dict.get("requested_subtitles").keys() | |
if info_dict.get("requested_subtitles") | |
else {} | |
), | |
) | |
) | |
subtitle_funcs = [ | |
getRequestedSubtitlesUrl, | |
getSubtitleLangUrl, | |
getSubtitleOtherUrl, | |
] | |
for index in range(len(subtitle_funcs)): | |
subtitle_url = subtitle_funcs[index](info_dict, lang, reqType) | |
if subtitle_url: | |
# print("subtitle_url: {}".format(subtitle_url)) | |
subtitle = fetchSubtitleBydlUrl(subType, subtitle_url, ydl=ydl) | |
print( | |
"function index:{}, url:{}, title:{}, duration:{} len(subtitle): {}".format( | |
index, url, title, duration, len(subtitle or "") | |
) | |
) | |
if subtitle is not None: | |
return { | |
"id": info_dict.get("id"), | |
"url": url, | |
"title": title, | |
"thumbnail": thumbnail, | |
"duration": duration, | |
"subtitle": subtitle, | |
"chapters": info_dict.get("chapters", None), | |
} | |
except Exception as e: | |
print("{}, {}".format(e, url)) | |
traceback.print_exc() | |
return {"error": str(e)} | |
return {"title": title, "duration": duration, "error": "No subtitles"} | |
def float_to_srt_time_format(d: float) -> str: | |
"""Convert decimal durations into proper srt format. | |
:rtype: str | |
:returns: | |
SubRip Subtitle (str) formatted time duration. | |
float_to_srt_time_format(3.89) -> '00:00:03,890' | |
""" | |
fraction, whole = math.modf(d) | |
time_fmt = time.strftime("%H:%M:%S,", time.gmtime(whole)) | |
ms = f"{fraction:.3f}".replace("0.", "") | |
return time_fmt + ms | |
def is_spaces_only(variable): | |
for char in variable: | |
if not char.isspace(): | |
return False | |
return True | |
def xml_caption_to_srt(xml_captions: str, skip_empty: bool = True) -> str: | |
"""Convert xml caption tracks to "SubRip Subtitle (srt)". | |
:param str xml_captions: | |
XML formatted caption tracks. | |
""" | |
segments = [] | |
root = ElementTree.fromstring(xml_captions) | |
for i, child in enumerate(list(root)): | |
text = child.text or "" | |
caption = unescape( | |
text.replace("\n", " ").replace(" ", " "), | |
) | |
if skip_empty and len(caption) == 0 or is_spaces_only(caption): | |
continue | |
try: | |
duration = float(child.attrib["dur"]) | |
except KeyError: | |
duration = 0.0 | |
start = float(child.attrib["start"]) | |
end = start + duration | |
sequence_number = i + 1 # convert from 0-indexed to 1. | |
line = "{seq}\n{start} --> {end}\n{text}\n".format( | |
seq=sequence_number, | |
start=float_to_srt_time_format(start), | |
end=float_to_srt_time_format(end), | |
text=caption, | |
) | |
segments.append(line) | |
if skip_empty: | |
# return None if no text in xml | |
return "\n".join(segments).strip() if len(segments) > 0 else None | |
return "\n".join(segments).strip() | |
def xml_caption_to_txt(xml_captions: str, skip_empty: bool = True) -> str: | |
"""Convert xml caption tracks to "SubRip Subtitle (srt)". | |
:param str xml_captions: | |
XML formatted caption tracks. | |
""" | |
segments = [] | |
root = ElementTree.fromstring(xml_captions) | |
for i, child in enumerate(list(root)): | |
text = child.text or "" | |
caption = unescape( | |
text.replace("\n", " ").replace(" ", " "), | |
) | |
if skip_empty and (len(caption) == 0 or is_spaces_only(caption)): | |
continue | |
line = "{text}\n".format(text=caption) | |
segments.append(line) | |
if skip_empty: | |
"\n".join(segments).strip() if len(segments) > 0 else None | |
return "\n".join(segments).strip() | |
async def fetchSubtitleUrls(url: str, proxy: Optional[str] = None) -> json: | |
try: | |
_, info_dict = extractInfo(url, None, proxy, True) | |
title = sanitize_filename(info_dict.get("title", "unknow")) | |
seconds = info_dict.get("duration") | |
duration = str(seconds) if seconds else "" | |
thumbnail = info_dict.get("thumbnail") | |
if ".webp" in thumbnail: | |
thumbnail = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format( | |
info_dict.get("id") | |
) | |
return { | |
"id": info_dict.get("id"), | |
"url": url, | |
"title": title, | |
"thumbnail": thumbnail, | |
"duration": duration, | |
"subtitles": info_dict.get("subtitles"), | |
"automatic_captions": info_dict.get("automatic_captions"), | |
} | |
except Exception as e: | |
print("{}, {}".format(e, url)) | |
traceback.print_exc() | |
return {"error": str(e)} | |
def createHeaders(): | |
return { | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
"Accept-Charset": "ISO-8859-1,utf-8;q=0.7,*;q=0.7", | |
"Accept-Encoding": "gzip, deflate", | |
"Accept-Language": "en-us,en;q=0.5", | |
"User-Agent": random_user_agent(), | |
} | |
def fetchSubtitleBydlUrl(subType, dlUrl, skipEmpty=True, ydl=None): | |
dlUrl = dlUrl if subType not in ["srt", "txt"] else re.sub(r"&fmt=[\w]+", "", dlUrl) | |
# if download mailed we may contain headers and cookies in info and use it here. | |
try: | |
ydl = ydl if ydl else YoutubeDL(getSubtitleOptions()) | |
with ydl.urlopen(Request(dlUrl, headers=createHeaders())) as resp: | |
if subType == "srt": | |
return xml_caption_to_srt(resp.read().decode(), skipEmpty) | |
elif subType == "txt": | |
return xml_caption_to_txt(resp.read().decode(), skipEmpty) | |
else: | |
return resp.read().decode() | |
except Exception as e: | |
print(e) | |
return None | |
def getSubtitleUrlByLang(info_dict, lang, subType, isLangKey): | |
subtitle_funcs = [ | |
getRequestedSubtitlesUrl, | |
getSubtitleLangUrl, | |
] | |
for index in range(len(subtitle_funcs)): | |
subtitle_url = subtitle_funcs[index]( | |
info_dict, lang, subType, isLangKey=isLangKey | |
) | |
print("getSubtitleUrlByLang subtitle_url: {}".format(subtitle_url)) | |
if subtitle_url: | |
return subtitle_url | |
def extractInfo(url, lang, proxy, forceProxy=False): | |
max_retry = 2 | |
retry = 0 | |
http_proxy = proxy if forceProxy else None | |
errMsg = None | |
while retry < max_retry: | |
try: | |
ydl_opts = getSubtitleOptions(lang, http_proxy) | |
ydl = YoutubeDL(ydl_opts) | |
return ydl, ydl.extract_info(url, download=False) | |
except DownloadError as e: | |
errMsg = str(e) | |
if "429" in errMsg: | |
http_proxy = proxy | |
if any(s in errMsg for s in NO_RETRY_STR): | |
# print("{}, {}".format(e, url)) | |
break | |
if not any(s in errMsg for s in RETRY_STR): | |
# print("{}, {}".format(e, url)) | |
break | |
retry += 1 | |
except Exception as e: | |
print(e) | |
traceback.print_exc() | |
raise e | |
raise Exception(errMsg) | |
async def fetchSubtitleByInfo( | |
url: str, subType: str, dlInfo, proxy: Optional[str] = None | |
): | |
try: | |
reqType = "xml" if subType in ["srt", "txt"] else subType | |
subtitle = None | |
if "dlUrl" in dlInfo: | |
subtitle = fetchSubtitleBydlUrl(subType, dlInfo.get("dlUrl"), False) | |
if subtitle is not None: | |
return subtitle | |
ydl, info_dict = extractInfo(url, dlInfo.get("lang", None), proxy, False) | |
if debug: | |
print( | |
"subtitles.keys(): {} automatic_captions: {} requested_subtitles: {}".format( | |
info_dict.get("subtitles").keys(), | |
info_dict.get("automatic_captions").keys(), | |
( | |
info_dict.get("requested_subtitles").keys() | |
if info_dict.get("requested_subtitles") | |
else {} | |
), | |
) | |
) | |
subtitleUrl = None | |
if "langKey" in dlInfo: | |
subtitleUrl = getSubtitleUrlByLang( | |
info_dict, dlInfo.get("langKey"), reqType, True | |
) | |
if subtitleUrl is None: | |
subtitleUrl = getSubtitleUrlByLang( | |
info_dict, dlInfo.get("lang"), reqType, False | |
) | |
print("subtitleUrl: {}".format(subtitleUrl)) | |
subtitle = fetchSubtitleBydlUrl(subType, subtitleUrl, False, ydl) | |
return subtitle | |
except Exception as e: | |
print("{}, {}".format(e, url)) | |
traceback.print_exc() | |
return {"error": str(e)} | |