ytdlp_subtitle / fetchYoutubeSubtitle.py
lanbogao's picture
1. Add retry to extract video, and use proxy when retry if has proxy.
eb92911
raw
history blame
15.7 kB
import os
import json
import math
import re
import time
import traceback
from typing import Optional
import xml.etree.ElementTree as ElementTree
from html import unescape
from yt_dlp import YoutubeDL, DownloadError
from yt_dlp.networking import Request
from yt_dlp.utils import sanitize_filename, random_user_agent
NO_RETRY_STR = [
"Sorry about that",
"unavailable",
"not available",
]
RETRY_STR = [
"URLError",
"429",
"IncompleteRead",
"Remote end closed connection",
# "No video formats found",
]
debug = os.getenv("DEBUG")
# yt-dlp subtitle types: json3,srv1,srv2,srv3,ttml,vtt, xml(youtube url with out extargs)
# "subtitles": {
# "live_chat": [
# {
# "url": "https://www.youtube.com/watch?v=ANtM2bHRz04&bpctr=9999999999&has_verified=1",
# "ext": "json",
# "video_id": "ANtM2bHRz04",
# "protocol": "youtube_live_chat_replay"
# }
# ]
# }
def getSubtitleOptions(
lang: Optional[str] = None,
proxy: Optional[str] = None,
):
ydl_opts = {
"noplaylist": True,
"writesubtitles": True,
"writeautomaticsub": True,
# "listsubtitles": True,
# "subtitlesformat": subType, # mark due to default youtube no srt and xml format
"skip_download": True,
"socket_timeout": 10,
"extractor_retries": 0,
# "debug_printtraffic": True,
"extractor_args": {
"youtube": {
"player_skip": [
"configs",
"initial",
], # skip "webpage" will cause l2P5PgL1LfI missing some langs,
"player_client": ["ios"],
"skip": ["hls", "dash"], # don't skip "translated_subs"
}
},
}
if lang:
ydl_opts.update(
{
"subtitleslangs": [
lang,
"-live_chat",
]
}
) # filter live chat to requested_subtitles
if proxy:
ydl_opts.update({"proxy": proxy, "socket_timeout": 20})
print(ydl_opts)
return ydl_opts
def getUrlFromSubtitleItem(item, lang="en", subType="vtt"):
# print("item: {}, lang: {}, subType: {}".format(item, lang, subType))
for subtitle in item[lang]:
if lang != "live_chat" and subType == "xml":
if debug:
print(
"subtitle source lang:{} url: {}".format(lang, subtitle.get("url"))
)
return subtitle.get("url").replace("&fmt=" + subtitle.get("ext"), "")
if subtitle.get("ext") == subType:
if debug:
print("subtitle lang:{} url: {}".format(lang, subtitle.get("url")))
return subtitle.get("url")
return None
def getRequestedSubtitlesUrl(info_dict, lang, subType, isLangKey=False):
item = info_dict.get("requested_subtitles")
if not item:
return None
langs = item.keys()
if lang in langs:
item = {lang: [item[lang]]} if type(item[lang]) == dict else item
url = getUrlFromSubtitleItem(item, lang, subType)
if url:
if debug:
print("getRequestedSubtitlesUrl lang:{}".format(lang))
return url
if not isLangKey:
for l in langs:
if l.startswith(lang):
item = {l: [item[l]]} if type(item[l]) == dict else item
url = getUrlFromSubtitleItem(item, l, subType)
if url:
if debug:
print("getRequestedSubtitlesUrl lang:{} url:{}".format(l, url))
return url
return None
def getSubtitleLangUrl(
info_dict,
lang="en",
subType="vtt",
subTitleKeys=["subtitles", "automatic_captions"],
isLangKey=False,
):
for subtitle_item in subTitleKeys:
langs = info_dict.get(subtitle_item).keys()
if lang in langs:
url = getUrlFromSubtitleItem(info_dict.get(subtitle_item), lang, subType)
if url:
if debug:
print("getSubtitleLangUrl lang:{}".format(lang))
return url
if not isLangKey:
for subtitle_item in subTitleKeys:
langs = info_dict.get(subtitle_item).keys()
for l in langs:
if l.startswith(lang):
url = getUrlFromSubtitleItem(
info_dict.get(subtitle_item), l, subType
)
if url:
if debug:
print("getSubtitleLangUrl lang:{} url:{}".format(l, url))
return url
return None
def getSubtitleOtherUrl(
info_dict,
lang="en",
subType="vtt",
subTitleKeys=["subtitles", "automatic_captions"],
):
for subtitle_item in subTitleKeys:
langs = info_dict.get(subtitle_item).keys()
if len(langs) == 0:
continue
l = lang if lang in langs else ("en" if "en" in langs else list(langs)[0])
if l is None:
continue
url = getUrlFromSubtitleItem(info_dict.get(subtitle_item), l, subType)
if url:
if debug:
print("getSubtitleOtherUrl lang:{} url:{}".format(l, url))
return url
return None
async def fetchSubtitle(
url: str,
lang: Optional[str] = "en",
subType: Optional[str] = "vtt",
proxy: Optional[str] = None,
) -> dict:
return await fetchAnySubtitle(url, lang, subType, proxy)
async def fetchAnySubtitle(
url: str,
lang: Optional[str] = "en",
subType: Optional[str] = "vtt",
proxy: Optional[str] = None,
) -> dict:
# lang-code or lang.* .* is regex
# reqLang = lang if len(lang.split("-")) > 1 or lang.endswith(".*") else lang + ".*"
title = "unknow"
duration = ""
try:
ydl, info_dict = extractInfo(url, lang, proxy, False)
# print(json.dumps(info_dict))
title = sanitize_filename(info_dict.get("title", "unknow"))
seconds = info_dict.get("duration")
duration = str(seconds) if seconds else ""
thumbnail = info_dict.get("thumbnail")
if ".webp" in thumbnail:
thumbnail = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(
info_dict.get("id")
)
reqType = subType
if info_dict.get("extractor") == "youtube" and subType in ["srt", "txt"]:
reqType = "xml"
if debug:
print(
"subtitles.keys(): {} automatic_captions: {} requested_subtitles: {}".format(
info_dict.get("subtitles").keys(),
info_dict.get("automatic_captions").keys(),
(
info_dict.get("requested_subtitles").keys()
if info_dict.get("requested_subtitles")
else {}
),
)
)
subtitle_funcs = [
getRequestedSubtitlesUrl,
getSubtitleLangUrl,
getSubtitleOtherUrl,
]
for index in range(len(subtitle_funcs)):
subtitle_url = subtitle_funcs[index](info_dict, lang, reqType)
if subtitle_url:
# print("subtitle_url: {}".format(subtitle_url))
subtitle = fetchSubtitleBydlUrl(subType, subtitle_url, ydl=ydl)
print(
"function index:{}, url:{}, title:{}, duration:{} len(subtitle): {}".format(
index, url, title, duration, len(subtitle or "")
)
)
if subtitle is not None:
return {
"id": info_dict.get("id"),
"url": url,
"title": title,
"thumbnail": thumbnail,
"duration": duration,
"subtitle": subtitle,
"chapters": info_dict.get("chapters", None),
}
except Exception as e:
print("{}, {}".format(e, url))
traceback.print_exc()
return {"error": str(e)}
return {"title": title, "duration": duration, "error": "No subtitles"}
def float_to_srt_time_format(d: float) -> str:
"""Convert decimal durations into proper srt format.
:rtype: str
:returns:
SubRip Subtitle (str) formatted time duration.
float_to_srt_time_format(3.89) -> '00:00:03,890'
"""
fraction, whole = math.modf(d)
time_fmt = time.strftime("%H:%M:%S,", time.gmtime(whole))
ms = f"{fraction:.3f}".replace("0.", "")
return time_fmt + ms
def is_spaces_only(variable):
for char in variable:
if not char.isspace():
return False
return True
def xml_caption_to_srt(xml_captions: str, skip_empty: bool = True) -> str:
"""Convert xml caption tracks to "SubRip Subtitle (srt)".
:param str xml_captions:
XML formatted caption tracks.
"""
segments = []
root = ElementTree.fromstring(xml_captions)
for i, child in enumerate(list(root)):
text = child.text or ""
caption = unescape(
text.replace("\n", " ").replace(" ", " "),
)
if skip_empty and len(caption) == 0 or is_spaces_only(caption):
continue
try:
duration = float(child.attrib["dur"])
except KeyError:
duration = 0.0
start = float(child.attrib["start"])
end = start + duration
sequence_number = i + 1 # convert from 0-indexed to 1.
line = "{seq}\n{start} --> {end}\n{text}\n".format(
seq=sequence_number,
start=float_to_srt_time_format(start),
end=float_to_srt_time_format(end),
text=caption,
)
segments.append(line)
if skip_empty:
# return None if no text in xml
return "\n".join(segments).strip() if len(segments) > 0 else None
return "\n".join(segments).strip()
def xml_caption_to_txt(xml_captions: str, skip_empty: bool = True) -> str:
"""Convert xml caption tracks to "SubRip Subtitle (srt)".
:param str xml_captions:
XML formatted caption tracks.
"""
segments = []
root = ElementTree.fromstring(xml_captions)
for i, child in enumerate(list(root)):
text = child.text or ""
caption = unescape(
text.replace("\n", " ").replace(" ", " "),
)
if skip_empty and (len(caption) == 0 or is_spaces_only(caption)):
continue
line = "{text}\n".format(text=caption)
segments.append(line)
if skip_empty:
"\n".join(segments).strip() if len(segments) > 0 else None
return "\n".join(segments).strip()
async def fetchSubtitleUrls(url: str, proxy: Optional[str] = None) -> json:
try:
_, info_dict = extractInfo(url, None, proxy, True)
title = sanitize_filename(info_dict.get("title", "unknow"))
seconds = info_dict.get("duration")
duration = str(seconds) if seconds else ""
thumbnail = info_dict.get("thumbnail")
if ".webp" in thumbnail:
thumbnail = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(
info_dict.get("id")
)
return {
"id": info_dict.get("id"),
"url": url,
"title": title,
"thumbnail": thumbnail,
"duration": duration,
"subtitles": info_dict.get("subtitles"),
"automatic_captions": info_dict.get("automatic_captions"),
}
except Exception as e:
print("{}, {}".format(e, url))
traceback.print_exc()
return {"error": str(e)}
def createHeaders():
return {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Charset": "ISO-8859-1,utf-8;q=0.7,*;q=0.7",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "en-us,en;q=0.5",
"User-Agent": random_user_agent(),
}
def fetchSubtitleBydlUrl(subType, dlUrl, skipEmpty=True, ydl=None):
dlUrl = dlUrl if subType not in ["srt", "txt"] else re.sub(r"&fmt=[\w]+", "", dlUrl)
# if download mailed we may contain headers and cookies in info and use it here.
try:
ydl = ydl if ydl else YoutubeDL(getSubtitleOptions())
with ydl.urlopen(Request(dlUrl, headers=createHeaders())) as resp:
if subType == "srt":
return xml_caption_to_srt(resp.read().decode(), skipEmpty)
elif subType == "txt":
return xml_caption_to_txt(resp.read().decode(), skipEmpty)
else:
return resp.read().decode()
except Exception as e:
print(e)
return None
def getSubtitleUrlByLang(info_dict, lang, subType, isLangKey):
subtitle_funcs = [
getRequestedSubtitlesUrl,
getSubtitleLangUrl,
]
for index in range(len(subtitle_funcs)):
subtitle_url = subtitle_funcs[index](
info_dict, lang, subType, isLangKey=isLangKey
)
print("getSubtitleUrlByLang subtitle_url: {}".format(subtitle_url))
if subtitle_url:
return subtitle_url
def extractInfo(url, lang, proxy, forceProxy=False):
max_retry = 2
retry = 0
http_proxy = proxy if forceProxy else None
errMsg = None
while retry < max_retry:
try:
ydl_opts = getSubtitleOptions(lang, http_proxy)
ydl = YoutubeDL(ydl_opts)
return ydl, ydl.extract_info(url, download=False)
except DownloadError as e:
errMsg = str(e)
if "429" in errMsg:
http_proxy = proxy
if any(s in errMsg for s in NO_RETRY_STR):
# print("{}, {}".format(e, url))
break
if not any(s in errMsg for s in RETRY_STR):
# print("{}, {}".format(e, url))
break
retry += 1
except Exception as e:
print(e)
traceback.print_exc()
raise e
raise Exception(errMsg)
async def fetchSubtitleByInfo(
url: str, subType: str, dlInfo, proxy: Optional[str] = None
):
try:
reqType = "xml" if subType in ["srt", "txt"] else subType
subtitle = None
if "dlUrl" in dlInfo:
subtitle = fetchSubtitleBydlUrl(subType, dlInfo.get("dlUrl"), False)
if subtitle is not None:
return subtitle
ydl, info_dict = extractInfo(url, dlInfo.get("lang", None), proxy, False)
if debug:
print(
"subtitles.keys(): {} automatic_captions: {} requested_subtitles: {}".format(
info_dict.get("subtitles").keys(),
info_dict.get("automatic_captions").keys(),
(
info_dict.get("requested_subtitles").keys()
if info_dict.get("requested_subtitles")
else {}
),
)
)
subtitleUrl = None
if "langKey" in dlInfo:
subtitleUrl = getSubtitleUrlByLang(
info_dict, dlInfo.get("langKey"), reqType, True
)
if subtitleUrl is None:
subtitleUrl = getSubtitleUrlByLang(
info_dict, dlInfo.get("lang"), reqType, False
)
print("subtitleUrl: {}".format(subtitleUrl))
subtitle = fetchSubtitleBydlUrl(subType, subtitleUrl, False, ydl)
return subtitle
except Exception as e:
print("{}, {}".format(e, url))
traceback.print_exc()
return {"error": str(e)}