import os import json import math import re import time import traceback from typing import Optional import xml.etree.ElementTree as ElementTree from html import unescape from yt_dlp import YoutubeDL, DownloadError from yt_dlp.networking import Request from yt_dlp.utils import sanitize_filename, random_user_agent NO_RETRY_STR = [ "Sorry about that", "unavailable", "not available", ] RETRY_STR = [ "URLError", "429", "IncompleteRead", "Remote end closed connection", # "No video formats found", ] debug = os.getenv("DEBUG") # yt-dlp subtitle types: json3,srv1,srv2,srv3,ttml,vtt, xml(youtube url with out extargs) # "subtitles": { # "live_chat": [ # { # "url": "https://www.youtube.com/watch?v=ANtM2bHRz04&bpctr=9999999999&has_verified=1", # "ext": "json", # "video_id": "ANtM2bHRz04", # "protocol": "youtube_live_chat_replay" # } # ] # } def getSubtitleOptions( lang: Optional[str] = None, proxy: Optional[str] = None, ): ydl_opts = { "noplaylist": True, "writesubtitles": True, "writeautomaticsub": True, # "listsubtitles": True, # "subtitlesformat": subType, # mark due to default youtube no srt and xml format "skip_download": True, "socket_timeout": 10, "extractor_retries": 0, # "debug_printtraffic": True, "extractor_args": { "youtube": { "player_skip": [ "configs", "initial", ], # skip "webpage" will cause l2P5PgL1LfI missing some langs, "player_client": ["ios"], "skip": ["hls", "dash"], # don't skip "translated_subs" } }, } if lang: ydl_opts.update( { "subtitleslangs": [ lang, "-live_chat", ] } ) # filter live chat to requested_subtitles if proxy: ydl_opts.update({"proxy": proxy, "socket_timeout": 20}) print(ydl_opts) return ydl_opts def getUrlFromSubtitleItem(item, lang="en", subType="vtt"): # print("item: {}, lang: {}, subType: {}".format(item, lang, subType)) for subtitle in item[lang]: if lang != "live_chat" and subType == "xml": if debug: print( "subtitle source lang:{} url: {}".format(lang, subtitle.get("url")) ) return subtitle.get("url").replace("&fmt=" + subtitle.get("ext"), "") if subtitle.get("ext") == subType: if debug: print("subtitle lang:{} url: {}".format(lang, subtitle.get("url"))) return subtitle.get("url") return None def getRequestedSubtitlesUrl(info_dict, lang, subType, isLangKey=False): item = info_dict.get("requested_subtitles") if not item: return None langs = item.keys() if lang in langs: item = {lang: [item[lang]]} if type(item[lang]) == dict else item url = getUrlFromSubtitleItem(item, lang, subType) if url: if debug: print("getRequestedSubtitlesUrl lang:{}".format(lang)) return url if not isLangKey: for l in langs: if l.startswith(lang): item = {l: [item[l]]} if type(item[l]) == dict else item url = getUrlFromSubtitleItem(item, l, subType) if url: if debug: print("getRequestedSubtitlesUrl lang:{} url:{}".format(l, url)) return url return None def getSubtitleLangUrl( info_dict, lang="en", subType="vtt", subTitleKeys=["subtitles", "automatic_captions"], isLangKey=False, ): for subtitle_item in subTitleKeys: langs = info_dict.get(subtitle_item).keys() if lang in langs: url = getUrlFromSubtitleItem(info_dict.get(subtitle_item), lang, subType) if url: if debug: print("getSubtitleLangUrl lang:{}".format(lang)) return url if not isLangKey: for subtitle_item in subTitleKeys: langs = info_dict.get(subtitle_item).keys() for l in langs: if l.startswith(lang): url = getUrlFromSubtitleItem( info_dict.get(subtitle_item), l, subType ) if url: if debug: print("getSubtitleLangUrl lang:{} url:{}".format(l, url)) return url return None def getSubtitleOtherUrl( info_dict, lang="en", subType="vtt", subTitleKeys=["subtitles", "automatic_captions"], ): for subtitle_item in subTitleKeys: langs = info_dict.get(subtitle_item).keys() if len(langs) == 0: continue l = lang if lang in langs else ("en" if "en" in langs else list(langs)[0]) if l is None: continue url = getUrlFromSubtitleItem(info_dict.get(subtitle_item), l, subType) if url: if debug: print("getSubtitleOtherUrl lang:{} url:{}".format(l, url)) return url return None async def fetchSubtitle( url: str, lang: Optional[str] = "en", subType: Optional[str] = "vtt", proxy: Optional[str] = None, ) -> dict: return await fetchAnySubtitle(url, lang, subType, proxy) async def fetchAnySubtitle( url: str, lang: Optional[str] = "en", subType: Optional[str] = "vtt", proxy: Optional[str] = None, ) -> dict: # lang-code or lang.* .* is regex # reqLang = lang if len(lang.split("-")) > 1 or lang.endswith(".*") else lang + ".*" title = "unknow" duration = "" try: ydl, info_dict = extractInfo(url, lang, proxy, False) # print(json.dumps(info_dict)) title = sanitize_filename(info_dict.get("title", "unknow")) seconds = info_dict.get("duration") duration = str(seconds) if seconds else "" thumbnail = info_dict.get("thumbnail") if ".webp" in thumbnail: thumbnail = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format( info_dict.get("id") ) reqType = subType if info_dict.get("extractor") == "youtube" and subType in ["srt", "txt"]: reqType = "xml" if debug: print( "subtitles.keys(): {} automatic_captions: {} requested_subtitles: {}".format( info_dict.get("subtitles").keys(), info_dict.get("automatic_captions").keys(), ( info_dict.get("requested_subtitles").keys() if info_dict.get("requested_subtitles") else {} ), ) ) subtitle_funcs = [ getRequestedSubtitlesUrl, getSubtitleLangUrl, getSubtitleOtherUrl, ] for index in range(len(subtitle_funcs)): subtitle_url = subtitle_funcs[index](info_dict, lang, reqType) if subtitle_url: # print("subtitle_url: {}".format(subtitle_url)) subtitle = fetchSubtitleBydlUrl(subType, subtitle_url, ydl=ydl) print( "function index:{}, url:{}, title:{}, duration:{} len(subtitle): {}".format( index, url, title, duration, len(subtitle or "") ) ) if subtitle is not None: return { "id": info_dict.get("id"), "url": url, "title": title, "thumbnail": thumbnail, "duration": duration, "subtitle": subtitle, "chapters": info_dict.get("chapters", None), } except Exception as e: print("{}, {}".format(e, url)) traceback.print_exc() return {"error": str(e)} return {"title": title, "duration": duration, "error": "No subtitles"} def float_to_srt_time_format(d: float) -> str: """Convert decimal durations into proper srt format. :rtype: str :returns: SubRip Subtitle (str) formatted time duration. float_to_srt_time_format(3.89) -> '00:00:03,890' """ fraction, whole = math.modf(d) time_fmt = time.strftime("%H:%M:%S,", time.gmtime(whole)) ms = f"{fraction:.3f}".replace("0.", "") return time_fmt + ms def is_spaces_only(variable): for char in variable: if not char.isspace(): return False return True def xml_caption_to_srt(xml_captions: str, skip_empty: bool = True) -> str: """Convert xml caption tracks to "SubRip Subtitle (srt)". :param str xml_captions: XML formatted caption tracks. """ segments = [] root = ElementTree.fromstring(xml_captions) for i, child in enumerate(list(root)): text = child.text or "" caption = unescape( text.replace("\n", " ").replace(" ", " "), ) if skip_empty and len(caption) == 0 or is_spaces_only(caption): continue try: duration = float(child.attrib["dur"]) except KeyError: duration = 0.0 start = float(child.attrib["start"]) end = start + duration sequence_number = i + 1 # convert from 0-indexed to 1. line = "{seq}\n{start} --> {end}\n{text}\n".format( seq=sequence_number, start=float_to_srt_time_format(start), end=float_to_srt_time_format(end), text=caption, ) segments.append(line) if skip_empty: # return None if no text in xml return "\n".join(segments).strip() if len(segments) > 0 else None return "\n".join(segments).strip() def xml_caption_to_txt(xml_captions: str, skip_empty: bool = True) -> str: """Convert xml caption tracks to "SubRip Subtitle (srt)". :param str xml_captions: XML formatted caption tracks. """ segments = [] root = ElementTree.fromstring(xml_captions) for i, child in enumerate(list(root)): text = child.text or "" caption = unescape( text.replace("\n", " ").replace(" ", " "), ) if skip_empty and (len(caption) == 0 or is_spaces_only(caption)): continue line = "{text}\n".format(text=caption) segments.append(line) if skip_empty: "\n".join(segments).strip() if len(segments) > 0 else None return "\n".join(segments).strip() async def fetchSubtitleUrls(url: str, proxy: Optional[str] = None) -> json: try: _, info_dict = extractInfo(url, None, proxy, True) title = sanitize_filename(info_dict.get("title", "unknow")) seconds = info_dict.get("duration") duration = str(seconds) if seconds else "" thumbnail = info_dict.get("thumbnail") if ".webp" in thumbnail: thumbnail = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format( info_dict.get("id") ) return { "id": info_dict.get("id"), "url": url, "title": title, "thumbnail": thumbnail, "duration": duration, "subtitles": info_dict.get("subtitles"), "automatic_captions": info_dict.get("automatic_captions"), } except Exception as e: print("{}, {}".format(e, url)) traceback.print_exc() return {"error": str(e)} def createHeaders(): return { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Charset": "ISO-8859-1,utf-8;q=0.7,*;q=0.7", "Accept-Encoding": "gzip, deflate", "Accept-Language": "en-us,en;q=0.5", "User-Agent": random_user_agent(), } def fetchSubtitleBydlUrl(subType, dlUrl, skipEmpty=True, ydl=None): dlUrl = dlUrl if subType not in ["srt", "txt"] else re.sub(r"&fmt=[\w]+", "", dlUrl) # if download mailed we may contain headers and cookies in info and use it here. try: ydl = ydl if ydl else YoutubeDL(getSubtitleOptions()) with ydl.urlopen(Request(dlUrl, headers=createHeaders())) as resp: if subType == "srt": return xml_caption_to_srt(resp.read().decode(), skipEmpty) elif subType == "txt": return xml_caption_to_txt(resp.read().decode(), skipEmpty) else: return resp.read().decode() except Exception as e: print(e) return None def getSubtitleUrlByLang(info_dict, lang, subType, isLangKey): subtitle_funcs = [ getRequestedSubtitlesUrl, getSubtitleLangUrl, ] for index in range(len(subtitle_funcs)): subtitle_url = subtitle_funcs[index]( info_dict, lang, subType, isLangKey=isLangKey ) print("getSubtitleUrlByLang subtitle_url: {}".format(subtitle_url)) if subtitle_url: return subtitle_url def extractInfo(url, lang, proxy, forceProxy=False): max_retry = 2 retry = 0 http_proxy = proxy if forceProxy else None errMsg = None while retry < max_retry: try: ydl_opts = getSubtitleOptions(lang, http_proxy) ydl = YoutubeDL(ydl_opts) return ydl, ydl.extract_info(url, download=False) except DownloadError as e: errMsg = str(e) if "429" in errMsg: http_proxy = proxy if any(s in errMsg for s in NO_RETRY_STR): # print("{}, {}".format(e, url)) break if not any(s in errMsg for s in RETRY_STR): # print("{}, {}".format(e, url)) break retry += 1 except Exception as e: print(e) traceback.print_exc() raise e raise Exception(errMsg) async def fetchSubtitleByInfo( url: str, subType: str, dlInfo, proxy: Optional[str] = None ): try: reqType = "xml" if subType in ["srt", "txt"] else subType subtitle = None if "dlUrl" in dlInfo: subtitle = fetchSubtitleBydlUrl(subType, dlInfo.get("dlUrl"), False) if subtitle is not None: return subtitle ydl, info_dict = extractInfo(url, dlInfo.get("lang", None), proxy, False) if debug: print( "subtitles.keys(): {} automatic_captions: {} requested_subtitles: {}".format( info_dict.get("subtitles").keys(), info_dict.get("automatic_captions").keys(), ( info_dict.get("requested_subtitles").keys() if info_dict.get("requested_subtitles") else {} ), ) ) subtitleUrl = None if "langKey" in dlInfo: subtitleUrl = getSubtitleUrlByLang( info_dict, dlInfo.get("langKey"), reqType, True ) if subtitleUrl is None: subtitleUrl = getSubtitleUrlByLang( info_dict, dlInfo.get("lang"), reqType, False ) print("subtitleUrl: {}".format(subtitleUrl)) subtitle = fetchSubtitleBydlUrl(subType, subtitleUrl, False, ydl) return subtitle except Exception as e: print("{}, {}".format(e, url)) traceback.print_exc() return {"error": str(e)}