Spaces:
Sleeping
Sleeping
File size: 3,619 Bytes
5ca3290 f88f6dd 5ca3290 f88f6dd 5ca3290 7cbd2ac 5ca3290 7e7a7a9 5ca3290 a351869 5ca3290 e6a27f0 5ca3290 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
import json
import os
import requests
import logging
logger = logging.getLogger(__name__)
from langchain.text_splitter import RecursiveCharacterTextSplitter, TokenTextSplitter
from typing import List, Dict
SUBTITLE_DOWNLOADER_URL = 'https://savesubs.com'
def fetchYoutubeSubtitleUrls(video_id):
headers = {
'accept': 'application/json, text/plain, */*',
'accept-language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7',
'cache-control': 'no-cache',
'Content-Type': 'application/json; charset=UTF-8',
'pragma': 'no-cache',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
'x-auth-token': os.environ.get('SAVESUBS_X_AUTH_TOKEN', ''),
'x-requested-domain': 'savesubs.com',
'X-requested-with': 'xmlhttprequest',
'sec-ch-ua': '"Google Chrome";v="111", "Not(A:Brand";v="8", "Chromium";v="111"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': 'Linux',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'authority': 'savesubs.com',
'origin': 'https://savesubs.com',
'referer': f'https://savesubs.com/process?url=https%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3D{video_id}'
}
data = {
'data': {'url': f'https://www.youtube.com/watch?v={video_id}'}
}
session = requests.Session()
proxy = os.environ.get('PROXY', None)
if proxy:
session.proxies = {
"http": proxy,
"https": proxy,
}
response = session.post(SUBTITLE_DOWNLOADER_URL + '/action/extract', json=data, headers=headers)
if response.status_code != 200:
logger.error("response.status_code: {}".format(response.status_code))
return {'title': None, 'subtitleList': None, 'error': response.reason}
else:
try:
json = response.json().get('response', {})
logger.info('subtitle url json: {}'.format(json))
return {'title': json.get('title'), 'subtitleList': json.get('formats')}
except Exception as error:
logger.error(error)
return {'title': None, 'subtitleList': None, 'error': str(error)}
async def find(subtitleList: List[Dict], args: Dict) -> Dict:
key = list(args.keys())[0]
return next((item for item in subtitleList if item[key] == args[key]), None)
async def fetchYoutubeSubtitle(videoId: str) -> Dict:
subtitle_url = ""
betterSubtitle = {}
subtitleList = []
title = ""
error = ""
result = fetchYoutubeSubtitleUrls(videoId)
title, subtitleList, error = result["title"], result["subtitleList"], result["error"]
if not subtitleList or len(subtitleList) <= 0:
return {"title": title, "docs": None, "error": error}
betterSubtitle = (
await find(subtitleList, {"quality": "English"})
or await find(subtitleList, {"quality": "English (auto"})
or await find(subtitleList, {"quality": "zh-CN"})
or subtitleList[0]
)
subtitleUrl = f"{SUBTITLE_DOWNLOADER_URL}{betterSubtitle['url']}?ext=srt"
session = requests.Session()
proxy = os.environ.get('PROXY', None)
if proxy:
session.proxies = {
"http": proxy,
"https": proxy,
}
response = session.get(url)
loader = SRTLoader(await response.blob())
splitter = TokenTextSplitter(
encodingName="cl100k_base",
chunkSize=2048,
)
docs = await loader.loadAndSplit(splitter)
return {"title": title, "docs": docs} |