This commit is contained in:
Mozi 2024-11-18 15:31:39 +05:30 committed by GitHub
commit 5c0017c86e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 372 additions and 168 deletions

View File

@ -3541,7 +3541,8 @@ def ffmpeg_fixup(cndn, msg, cls):
'writing DASH m4a. Only some players support this container',
FFmpegFixupM4aPP)
ffmpeg_fixup(downloader == 'hlsnative' and not self.params.get('hls_use_mpegts')
or info_dict.get('is_live') and self.params.get('hls_use_mpegts') is None,
or info_dict.get('is_live') and self.params.get('hls_use_mpegts') is None
or downloader == 'niconico_live',
'Possible MPEG-TS in MP4 container or malformed AAC timestamps',
FFmpegFixupM3u8PP)
ffmpeg_fixup(downloader == 'dashsegments'

View File

@ -1,12 +1,22 @@
import contextlib
import json
import math
import threading
import time
from . import get_suitable_downloader
from .common import FileDownloader
from .external import FFmpegFD
from ..downloader.fragment import FragmentFD
from ..networking import Request
from ..utils import DownloadError, str_or_none, try_get
from ..networking.exceptions import network_exceptions
from ..utils import (
DownloadError,
RetryManager,
str_or_none,
traverse_obj,
urljoin,
)
class NiconicoDmcFD(FileDownloader):
@ -56,85 +66,184 @@ def heartbeat():
return success
class NiconicoLiveFD(FileDownloader):
""" Downloads niconico live without being stopped """
class NiconicoLiveFD(FragmentFD):
""" Downloads niconico live/timeshift VOD """
def real_download(self, filename, info_dict):
video_id = info_dict['video_id']
ws_url = info_dict['url']
ws_extractor = info_dict['ws']
ws_origin_host = info_dict['origin']
live_quality = info_dict.get('live_quality', 'high')
live_latency = info_dict.get('live_latency', 'high')
dl = FFmpegFD(self.ydl, self.params or {})
_PER_FRAGMENT_DOWNLOAD_RATIO = 0.1
_WEBSOCKET_RECONNECT_DELAY = 10
new_info_dict = info_dict.copy()
new_info_dict.update({
'protocol': 'm3u8',
})
@contextlib.contextmanager
def _ws_context(self, info_dict):
""" Hold a WebSocket object and release it when leaving """
def communicate_ws(reconnect):
if reconnect:
ws = self.ydl.urlopen(Request(ws_url, headers={'Origin': f'https://{ws_origin_host}'}))
if self.ydl.params.get('verbose', False):
self.to_screen('[debug] Sending startWatching request')
ws.send(json.dumps({
'type': 'startWatching',
'data': {
'stream': {
'quality': live_quality,
'protocol': 'hls+fmp4',
'latency': live_latency,
'chasePlay': False,
},
'room': {
'protocol': 'webSocket',
'commentable': True,
},
'reconnect': True,
video_id = info_dict['id']
format_id = info_dict['format_id']
live_latency = info_dict['downloader_options']['live_latency']
ws_url = info_dict['downloader_options']['ws_url']
self.ws = None
self.m3u8_lock = threading.Event()
self.m3u8_url = None
def communicate_ws():
self.ws = self.ydl.urlopen(Request(ws_url, headers=info_dict.get('http_headers')))
if self.ydl.params.get('verbose', False):
self.write_debug('Sending HLS server request')
self.ws.send(json.dumps({
'type': 'startWatching',
'data': {
'stream': {
'quality': format_id,
'protocol': 'hls',
'latency': live_latency,
'chasePlay': False,
},
}))
else:
ws = ws_extractor
with ws:
'room': {
'protocol': 'webSocket',
'commentable': True,
},
},
}))
with self.ws:
while True:
recv = ws.recv()
recv = self.ws.recv()
if not recv:
continue
data = json.loads(recv)
if not data or not isinstance(data, dict):
if not isinstance(data, dict):
continue
if data.get('type') == 'ping':
# pong back
ws.send(r'{"type":"pong"}')
ws.send(r'{"type":"keepSeat"}')
self.ws.send(r'{"type":"pong"}')
self.ws.send(r'{"type":"keepSeat"}')
elif data.get('type') == 'stream':
self.m3u8_url = data['data']['uri']
self.m3u8_lock.set()
elif data.get('type') == 'disconnect':
self.write_debug(data)
return True
return
elif data.get('type') == 'error':
self.write_debug(data)
message = try_get(data, lambda x: x['body']['code'], str) or recv
return DownloadError(message)
message = traverse_obj(data, ('data', 'code')) or recv
raise DownloadError(message)
elif self.ydl.params.get('verbose', False):
if len(recv) > 100:
recv = recv[:100] + '...'
self.to_screen(f'[debug] Server said: {recv}')
self.write_debug(f'Server said: {recv}')
stopped = threading.Event()
def ws_main():
reconnect = False
while True:
while not stopped.is_set():
try:
ret = communicate_ws(reconnect)
if ret is True:
return
except BaseException as e:
self.to_screen('[{}] {}: Connection error occured, reconnecting after 10 seconds: {}'.format('niconico:live', video_id, str_or_none(e)))
time.sleep(10)
continue
finally:
reconnect = True
communicate_ws()
break # Disconnected
except BaseException as e: # Including TransportError
if stopped.is_set():
break
self.m3u8_lock.clear() # m3u8 url may be changed
self.to_screen('[{}] {}: Connection error occured, reconnecting after {} seconds: {}'.format(
'niconico:live', video_id, self._WEBSOCKET_RECONNECT_DELAY, str_or_none(e)))
time.sleep(self._WEBSOCKET_RECONNECT_DELAY)
self.m3u8_lock.set() # Release possible locks
thread = threading.Thread(target=ws_main, daemon=True)
thread.start()
return dl.download(filename, new_info_dict)
try:
yield self
finally:
stopped.set()
if self.ws:
self.ws.close()
thread.join()
def _master_m3u8_url(self):
""" Get the refreshed manifest url after WebSocket reconnection to prevent HTTP 403 """
self.m3u8_lock.wait()
return self.m3u8_url
def real_download(self, filename, info_dict):
with self._ws_context(info_dict) as ws_context:
# live
if info_dict.get('is_live'):
info_dict = info_dict.copy()
info_dict['protocol'] = 'm3u8'
return FFmpegFD(self.ydl, self.params or {}).download(filename, info_dict)
# timeshift VOD
from ..extractor.niconico import NiconicoIE
ie = NiconicoIE(self.ydl)
video_id = info_dict['id']
# Get video info
total_duration = 0
fragment_duration = 0
for line in ie._download_webpage(info_dict['url'], video_id, note='Downloading m3u8').splitlines():
if '#STREAM-DURATION' in line:
total_duration = int(float(line.split(':')[1]))
if '#EXT-X-TARGETDURATION' in line:
fragment_duration = int(line.split(':')[1])
if not (total_duration and fragment_duration):
raise DownloadError('Unable to get required video info')
ctx = {
'filename': filename,
'total_frags': math.ceil(total_duration / fragment_duration),
}
self._prepare_and_start_frag_download(ctx, info_dict)
downloaded_duration = ctx['fragment_index'] * fragment_duration
while True:
if downloaded_duration > total_duration:
break
retry_manager = RetryManager(self.params.get('fragment_retries'), self.report_retry)
for retry in retry_manager:
try:
# Refresh master m3u8 (if possible) to get the new URL of the previously-chose format
media_m3u8_url = ie._extract_m3u8_formats(
ws_context._master_m3u8_url(), video_id, note=False,
query={'start': downloaded_duration}, live=False)[0]['url']
# Get all fragments
media_m3u8 = ie._download_webpage(
media_m3u8_url, video_id, note=False, errnote='Unable to download media m3u8')
fragment_urls = traverse_obj(media_m3u8.splitlines(), (
lambda _, v: not v.startswith('#'), {lambda url: urljoin(media_m3u8_url, url)}))
with self.DurationLimiter(len(fragment_urls) * fragment_duration * self._PER_FRAGMENT_DOWNLOAD_RATIO):
for fragment_url in fragment_urls:
success = self._download_fragment(ctx, fragment_url, info_dict)
if not success:
return False
self._append_fragment(ctx, self._read_fragment(ctx))
downloaded_duration += fragment_duration
except (DownloadError, *network_exceptions) as err:
retry.error = err
continue
if retry_manager.error:
return False
return self._finish_frag_download(ctx, info_dict)
class DurationLimiter:
def __init__(self, target):
self.target = target
def __enter__(self):
self.start = time.time()
def __exit__(self, *exc):
remaining = self.target - (time.time() - self.start)
if remaining > 0:
time.sleep(remaining)

View File

@ -7,7 +7,6 @@
import urllib.parse
from .common import InfoExtractor, SearchInfoExtractor
from ..networking import Request
from ..networking.exceptions import HTTPError
from ..utils import (
ExtractorError,
@ -32,12 +31,56 @@
)
class NiconicoIE(InfoExtractor):
IE_NAME = 'niconico'
IE_DESC = 'ニコニコ動画'
class NiconicoBaseIE(InfoExtractor):
_NETRC_MACHINE = 'niconico'
_GEO_COUNTRIES = ['JP']
_GEO_BYPASS = False
def _perform_login(self, username, password):
login_ok = True
login_form_strs = {
'mail_tel': username,
'password': password,
}
self._request_webpage(
'https://account.nicovideo.jp/login', None,
note='Acquiring Login session')
page = self._download_webpage(
'https://account.nicovideo.jp/login/redirector?show_button_twitter=1&site=niconico&show_button_facebook=1', None,
note='Logging in', errnote='Unable to log in',
data=urlencode_postdata(login_form_strs),
headers={
'Referer': 'https://account.nicovideo.jp/login',
'Content-Type': 'application/x-www-form-urlencoded',
})
if 'oneTimePw' in page:
post_url = self._search_regex(
r'<form[^>]+action=(["\'])(?P<url>.+?)\1', page, 'post url', group='url')
page = self._download_webpage(
urljoin('https://account.nicovideo.jp', post_url), None,
note='Performing MFA', errnote='Unable to complete MFA',
data=urlencode_postdata({
'otp': self._get_tfa_info('6 digits code'),
}), headers={
'Content-Type': 'application/x-www-form-urlencoded',
})
if 'oneTimePw' in page or 'formError' in page:
err_msg = self._html_search_regex(
r'formError["\']+>(.*?)</div>', page, 'form_error',
default='There\'s an error but the message can\'t be parsed.',
flags=re.DOTALL)
self.report_warning(f'Unable to log in: MFA challenge failed, "{err_msg}"')
return False
login_ok = 'class="notice error"' not in page
if not login_ok:
self.report_warning('Unable to log in: bad username or password')
return login_ok
class NiconicoIE(NiconicoBaseIE):
IE_NAME = 'niconico'
IE_DESC = 'ニコニコ動画'
_TESTS = [{
'url': 'http://www.nicovideo.jp/watch/sm22312215',
'info_dict': {
@ -176,7 +219,6 @@ class NiconicoIE(InfoExtractor):
}]
_VALID_URL = r'https?://(?:(?:www\.|secure\.|sp\.)?nicovideo\.jp/watch|nico\.ms)/(?P<id>(?:[a-z]{2})?[0-9]+)'
_NETRC_MACHINE = 'niconico'
_API_HEADERS = {
'X-Frontend-ID': '6',
'X-Frontend-Version': '0',
@ -185,46 +227,6 @@ class NiconicoIE(InfoExtractor):
'Origin': 'https://www.nicovideo.jp',
}
def _perform_login(self, username, password):
login_ok = True
login_form_strs = {
'mail_tel': username,
'password': password,
}
self._request_webpage(
'https://account.nicovideo.jp/login', None,
note='Acquiring Login session')
page = self._download_webpage(
'https://account.nicovideo.jp/login/redirector?show_button_twitter=1&site=niconico&show_button_facebook=1', None,
note='Logging in', errnote='Unable to log in',
data=urlencode_postdata(login_form_strs),
headers={
'Referer': 'https://account.nicovideo.jp/login',
'Content-Type': 'application/x-www-form-urlencoded',
})
if 'oneTimePw' in page:
post_url = self._search_regex(
r'<form[^>]+action=(["\'])(?P<url>.+?)\1', page, 'post url', group='url')
page = self._download_webpage(
urljoin('https://account.nicovideo.jp', post_url), None,
note='Performing MFA', errnote='Unable to complete MFA',
data=urlencode_postdata({
'otp': self._get_tfa_info('6 digits code'),
}), headers={
'Content-Type': 'application/x-www-form-urlencoded',
})
if 'oneTimePw' in page or 'formError' in page:
err_msg = self._html_search_regex(
r'formError["\']+>(.*?)</div>', page, 'form_error',
default='There\'s an error but the message can\'t be parsed.',
flags=re.DOTALL)
self.report_warning(f'Unable to log in: MFA challenge failed, "{err_msg}"')
return False
login_ok = 'class="notice error"' not in page
if not login_ok:
self.report_warning('Unable to log in: bad username or password')
return login_ok
def _get_heartbeat_info(self, info_dict):
video_id, video_src_id, audio_src_id = info_dict['url'].split(':')[1].split('/')
dmc_protocol = info_dict['expected_protocol']
@ -906,7 +908,7 @@ def _real_extract(self, url):
return self.playlist_result(self._entries(list_id), list_id)
class NiconicoLiveIE(InfoExtractor):
class NiconicoLiveIE(NiconicoBaseIE):
IE_NAME = 'niconico:live'
IE_DESC = 'ニコニコ生放送'
_VALID_URL = r'https?://(?:sp\.)?live2?\.nicovideo\.jp/(?:watch|gate)/(?P<id>lv\d+)'
@ -916,17 +918,30 @@ class NiconicoLiveIE(InfoExtractor):
'info_dict': {
'id': 'lv339533123',
'title': '激辛ペヤング食べます\u202a( ;ᯅ; )\u202c(歌枠オーディション参加中)',
'view_count': 1526,
'comment_count': 1772,
'view_count': int,
'comment_count': int,
'description': '初めましてもかって言います❕\nのんびり自由に適当に暮らしてます',
'uploader': 'もか',
'channel': 'ゲストさんのコミュニティ',
'channel_id': 'co5776900',
'channel_url': 'https://com.nicovideo.jp/community/co5776900',
'timestamp': 1670677328,
'is_live': True,
'ext': None,
'live_latency': 'high',
'live_status': 'was_live',
'thumbnail': r're:^https://[\w.-]+/\w+/\w+',
'thumbnails': list,
'upload_date': '20221210',
},
'skip': 'livestream',
'params': {
'skip_download': True,
'ignore_no_formats_error': True,
},
'expected_warnings': [
'The live hasn\'t started yet or already ended.',
'No video formats found!',
'Requested format is not available',
],
}, {
'url': 'https://live2.nicovideo.jp/watch/lv339533123',
'only_matching': True,
@ -940,36 +955,17 @@ class NiconicoLiveIE(InfoExtractor):
_KNOWN_LATENCY = ('high', 'low')
def _real_extract(self, url):
video_id = self._match_id(url)
webpage, urlh = self._download_webpage_handle(f'https://live.nicovideo.jp/watch/{video_id}', video_id)
embedded_data = self._parse_json(unescapeHTML(self._search_regex(
r'<script\s+id="embedded-data"\s*data-props="(.+?)"', webpage, 'embedded data')), video_id)
ws_url = traverse_obj(embedded_data, ('site', 'relive', 'webSocketUrl'))
if not ws_url:
raise ExtractorError('The live hasn\'t started yet or already ended.', expected=True)
ws_url = update_url_query(ws_url, {
'frontend_id': traverse_obj(embedded_data, ('site', 'frontendId')) or '9',
})
hostname = remove_start(urllib.parse.urlparse(urlh.url).hostname, 'sp.')
latency = try_get(self._configuration_arg('latency'), lambda x: x[0])
if latency not in self._KNOWN_LATENCY:
latency = 'high'
def _yield_formats(self, ws_url, headers, latency, video_id, is_live):
ws = self._request_webpage(
Request(ws_url, headers={'Origin': f'https://{hostname}'}),
video_id=video_id, note='Connecting to WebSocket server')
ws_url, video_id, note='Connecting to WebSocket server', headers=headers)
self.write_debug('[debug] Sending HLS server request')
self.write_debug('Sending HLS server request')
ws.send(json.dumps({
'type': 'startWatching',
'data': {
'stream': {
'quality': 'abr',
'protocol': 'hls+fmp4',
'protocol': 'hls',
'latency': latency,
'chasePlay': False,
},
@ -977,32 +973,55 @@ def _real_extract(self, url):
'protocol': 'webSocket',
'commentable': True,
},
'reconnect': False,
},
}))
while True:
recv = ws.recv()
if not recv:
continue
data = json.loads(recv)
if not isinstance(data, dict):
continue
if data.get('type') == 'stream':
m3u8_url = data['data']['uri']
qualities = data['data']['availableQualities']
break
elif data.get('type') == 'disconnect':
self.write_debug(recv)
raise ExtractorError('Disconnected at middle of extraction')
elif data.get('type') == 'error':
self.write_debug(recv)
message = traverse_obj(data, ('body', 'code')) or recv
raise ExtractorError(message)
elif self.get_param('verbose', False):
if len(recv) > 100:
recv = recv[:100] + '...'
self.write_debug(f'Server said: {recv}')
with ws:
while True:
recv = ws.recv()
if not recv:
continue
data = json.loads(recv)
if not isinstance(data, dict):
continue
if data.get('type') == 'stream':
m3u8_url = data['data']['uri']
qualities = data['data']['availableQualities']
break
elif data.get('type') == 'disconnect':
self.write_debug(data)
raise ExtractorError('Disconnected at middle of extraction')
elif data.get('type') == 'error':
self.write_debug(data)
message = traverse_obj(data, ('data', 'code')) or recv
raise ExtractorError(message)
elif self.get_param('verbose', False):
if len(recv) > 100:
recv = recv[:100] + '...'
self.write_debug(f'Server said: {recv}')
formats = sorted(self._extract_m3u8_formats(
m3u8_url, video_id, ext='mp4', live=is_live), key=lambda f: f['tbr'], reverse=True)
for fmt, q in zip(formats, qualities[1:]):
fmt.update({
'format_id': q,
'protocol': 'niconico_live',
})
yield fmt
def _real_extract(self, url):
video_id = self._match_id(url)
webpage, urlh = self._download_webpage_handle(f'https://live.nicovideo.jp/watch/{video_id}', video_id)
headers = {'Origin': 'https://' + remove_start(urllib.parse.urlparse(urlh.url).hostname, 'sp.')}
embedded_data = self._parse_json(unescapeHTML(self._search_regex(
r'<script\s+id="embedded-data"\s*data-props="(.+?)"', webpage, 'embedded data')), video_id)
ws_url = traverse_obj(embedded_data, ('site', 'relive', 'webSocketUrl'))
if ws_url:
ws_url = update_url_query(ws_url, {
'frontend_id': traverse_obj(embedded_data, ('site', 'frontendId')) or '9',
})
title = traverse_obj(embedded_data, ('program', 'title')) or self._html_search_meta(
('og:title', 'twitter:title'), webpage, 'live title', fatal=False)
@ -1028,16 +1047,19 @@ def _real_extract(self, url):
**res,
})
formats = self._extract_m3u8_formats(m3u8_url, video_id, ext='mp4', live=True)
for fmt, q in zip(formats, reversed(qualities[1:])):
fmt.update({
'format_id': q,
'protocol': 'niconico_live',
'ws': ws,
'video_id': video_id,
'live_latency': latency,
'origin': hostname,
})
live_status, availability = self._check_status_and_availability(embedded_data, video_id)
if availability == 'premium_only':
self.raise_login_required('This video requires premium', metadata_available=True)
elif availability == 'subscriber_only':
self.raise_login_required('This video is for members only', metadata_available=True)
elif availability == 'needs_auth':
# PPV or tickets for limited time viewing
self.raise_login_required('This video requires additional steps to watch', metadata_available=True)
latency = try_get(self._configuration_arg('latency'), lambda x: x[0])
if latency not in self._KNOWN_LATENCY:
latency = 'high'
return {
'id': video_id,
@ -1052,7 +1074,79 @@ def _real_extract(self, url):
}),
'description': clean_html(traverse_obj(embedded_data, ('program', 'description'))),
'timestamp': int_or_none(traverse_obj(embedded_data, ('program', 'openTime'))),
'is_live': True,
'live_status': live_status,
'availability': availability,
'thumbnails': thumbnails,
'formats': formats,
'formats': [*self._yield_formats(
ws_url, headers, latency, video_id, live_status == 'is_live')] if ws_url else None,
'http_headers': headers,
'downloader_options': {
'live_latency': latency,
'ws_url': ws_url,
},
}
def _check_status_and_availability(self, embedded_data, video_id):
live_status = {
'Before': 'is_live',
'Open': 'was_live',
'End': 'was_live',
}.get(traverse_obj(embedded_data, ('programTimeshift', 'publication', 'status', {str})), 'is_live')
if traverse_obj(embedded_data, ('userProgramWatch', 'canWatch', {bool})):
is_member_free = traverse_obj(embedded_data, ('program', 'isMemberFree', {bool}))
is_shown = traverse_obj(embedded_data, ('program', 'trialWatch', 'isShown', {bool}))
self.write_debug(f'.program.isMemberFree: {is_member_free}; .program.trialWatch.isShown: {is_shown}')
if is_member_free is None and is_shown is None:
return live_status, self._availability()
if is_member_free is False:
availability = {'needs_auth': True}
msg = 'Paid content cannot be accessed, the video may be blank.'
else:
availability = {'needs_subscription': True}
msg = 'Restricted content cannot be accessed, a part of the video or the entire video may be blank.'
self.report_warning(msg, video_id)
return live_status, self._availability(**availability)
if traverse_obj(embedded_data, ('userProgramWatch', 'isCountryRestrictionTarget', {bool})):
self.raise_geo_restricted(countries=self._GEO_COUNTRIES, metadata_available=True)
return live_status, self._availability()
rejected_reasons = traverse_obj(embedded_data, ('userProgramWatch', 'rejectedReasons', ..., {str}))
self.write_debug(f'.userProgramWatch.rejectedReasons: {rejected_reasons!r}')
if 'programNotBegun' in rejected_reasons:
self.report_warning('Live has not started', video_id)
live_status = 'is_upcoming'
elif 'timeshiftBeforeOpen' in rejected_reasons:
self.report_warning('Live has ended but timeshift is not yet processed', video_id)
live_status = 'post_live'
elif 'noTimeshiftProgram' in rejected_reasons:
self.report_warning('Timeshift is disabled', video_id)
live_status = 'was_live'
elif any(x in ['timeshiftClosed', 'timeshiftClosedAndNotFollow'] for x in rejected_reasons):
self.report_warning('Timeshift viewing period has ended', video_id)
live_status = 'was_live'
availability = self._availability(needs_premium='notLogin' in rejected_reasons, needs_subscription=any(x in [
'notSocialGroupMember',
'notCommunityMember',
'notChannelMember',
'notCommunityMemberAndNotHaveTimeshiftTicket',
'notChannelMemberAndNotHaveTimeshiftTicket',
] for x in rejected_reasons), needs_auth=any(x in [
'timeshiftTicketExpired',
'notHaveTimeshiftTicket',
'notCommunityMemberAndNotHaveTimeshiftTicket',
'notChannelMemberAndNotHaveTimeshiftTicket',
'notHavePayTicket',
'notActivatedBySerial',
'notHavePayTicketAndNotActivatedBySerial',
'notUseTimeshiftTicket',
'notUseTimeshiftTicketOnOnceTimeshift',
'notUseTimeshiftTicketOnUnlimitedTimeshift',
] for x in rejected_reasons))
return live_status, availability

View File

@ -887,7 +887,7 @@ def run(self, info):
class FFmpegFixupM3u8PP(FFmpegFixupPostProcessor):
def _needs_fixup(self, info):
yield info['ext'] in ('mp4', 'm4a')
yield info['protocol'].startswith('m3u8')
yield info['protocol'].startswith('m3u8') or info['protocol'] == 'niconico_live'
try:
metadata = self.get_metadata_object(info['filepath'])
except PostProcessingError as e: