diff --git a/yt_dlp/extractor/abematv.py b/yt_dlp/extractor/abematv.py
index ec1af1d0cc..d8ad78705c 100644
--- a/yt_dlp/extractor/abematv.py
+++ b/yt_dlp/extractor/abematv.py
@@ -1,5 +1,6 @@
import base64
import binascii
+import functools
import hashlib
import hmac
import io
@@ -20,11 +21,11 @@
decode_base_n,
int_or_none,
intlist_to_bytes,
+ OnDemandPagedList,
request_to_url,
time_seconds,
traverse_obj,
update_url_query,
- urljoin,
)
# NOTE: network handler related code is temporary thing until network stack overhaul PRs are merged (#2861/#2862)
@@ -145,17 +146,106 @@ def abematv_license_open(self, url):
class AbemaTVBaseIE(InfoExtractor):
+ _USERTOKEN = None
+ _DEVICE_ID = None
+ _MEDIATOKEN = None
+
+ _SECRETKEY = b'v+Gjs=25Aw5erR!J8ZuvRrCx*rGswhB&qdHd_SYerEWdU&a?3DzN9BRbp5KwY4hEmcj5#fykMjJ=AuWz5GSMY-d@H7DMEh3M@9n2G552Us$$k9cD=3TxwWe86!x#Zyhe'
+
+ @classmethod
+ def _generate_aks(cls, deviceid):
+ deviceid = deviceid.encode('utf-8')
+ # add 1 hour and then drop minute and secs
+ ts_1hour = int((time_seconds(hours=9) // 3600 + 1) * 3600)
+ time_struct = time.gmtime(ts_1hour)
+ ts_1hour_str = str(ts_1hour).encode('utf-8')
+
+ tmp = None
+
+ def mix_once(nonce):
+ nonlocal tmp
+ h = hmac.new(cls._SECRETKEY, digestmod=hashlib.sha256)
+ h.update(nonce)
+ tmp = h.digest()
+
+ def mix_tmp(count):
+ nonlocal tmp
+ for i in range(count):
+ mix_once(tmp)
+
+ def mix_twist(nonce):
+ nonlocal tmp
+ mix_once(base64.urlsafe_b64encode(tmp).rstrip(b'=') + nonce)
+
+ mix_once(cls._SECRETKEY)
+ mix_tmp(time_struct.tm_mon)
+ mix_twist(deviceid)
+ mix_tmp(time_struct.tm_mday % 5)
+ mix_twist(ts_1hour_str)
+ mix_tmp(time_struct.tm_hour % 5)
+
+ return base64.urlsafe_b64encode(tmp).rstrip(b'=').decode('utf-8')
+
+ def _get_device_token(self):
+ if self._USERTOKEN:
+ return self._USERTOKEN
+
+ AbemaTVBaseIE._DEVICE_ID = str(uuid.uuid4())
+ aks = self._generate_aks(self._DEVICE_ID)
+ user_data = self._download_json(
+ 'https://api.abema.io/v1/users', None, note='Authorizing',
+ data=json.dumps({
+ 'deviceId': self._DEVICE_ID,
+ 'applicationKeySecret': aks,
+ }).encode('utf-8'),
+ headers={
+ 'Content-Type': 'application/json',
+ })
+ AbemaTVBaseIE._USERTOKEN = user_data['token']
+
+ # don't allow adding it 2 times or more, though it's guarded
+ remove_opener(self._downloader, AbemaLicenseHandler)
+ add_opener(self._downloader, AbemaLicenseHandler(self))
+
+ return self._USERTOKEN
+
+ def _get_media_token(self, invalidate=False, to_show=True):
+ if not invalidate and self._MEDIATOKEN:
+ return self._MEDIATOKEN
+
+ AbemaTVBaseIE._MEDIATOKEN = self._download_json(
+ 'https://api.abema.io/v1/media/token', None, note='Fetching media token' if to_show else False,
+ query={
+ 'osName': 'android',
+ 'osVersion': '6.0.1',
+ 'osLang': 'ja_JP',
+ 'osTimezone': 'Asia/Tokyo',
+ 'appId': 'tv.abema',
+ 'appVersion': '3.27.1'
+ }, headers={
+ 'Authorization': f'bearer {self._get_device_token()}',
+ })['token']
+
+ return self._MEDIATOKEN
+
+ def _call_api(self, endpoint, video_id, query=None, note='Downloading JSON metadata'):
+ return self._download_json(
+ f'https://api.abema.io/{endpoint}', video_id, query=query or {},
+ note=note,
+ headers={
+ 'Authorization': f'bearer {self._get_device_token()}',
+ })
+
def _extract_breadcrumb_list(self, webpage, video_id):
for jld in re.finditer(
r'(?is)',
webpage):
jsonld = self._parse_json(jld.group('json_ld'), video_id, fatal=False)
- if jsonld:
- if jsonld.get('@type') != 'BreadcrumbList':
- continue
- trav = traverse_obj(jsonld, ('itemListElement', ..., 'name'))
- if trav:
- return trav
+ if traverse_obj(jsonld, '@type') != 'BreadcrumbList':
+ continue
+ items = traverse_obj(jsonld, ('itemListElement', ..., 'name'))
+ if items:
+ return items
return []
@@ -207,87 +297,7 @@ class AbemaTVIE(AbemaTVBaseIE):
},
'skip': 'Not supported until yt-dlp implements native live downloader OR AbemaTV can start a local HTTP server',
}]
- _USERTOKEN = None
- _DEVICE_ID = None
_TIMETABLE = None
- _MEDIATOKEN = None
-
- _SECRETKEY = b'v+Gjs=25Aw5erR!J8ZuvRrCx*rGswhB&qdHd_SYerEWdU&a?3DzN9BRbp5KwY4hEmcj5#fykMjJ=AuWz5GSMY-d@H7DMEh3M@9n2G552Us$$k9cD=3TxwWe86!x#Zyhe'
-
- def _generate_aks(self, deviceid):
- deviceid = deviceid.encode('utf-8')
- # add 1 hour and then drop minute and secs
- ts_1hour = int((time_seconds(hours=9) // 3600 + 1) * 3600)
- time_struct = time.gmtime(ts_1hour)
- ts_1hour_str = str(ts_1hour).encode('utf-8')
-
- tmp = None
-
- def mix_once(nonce):
- nonlocal tmp
- h = hmac.new(self._SECRETKEY, digestmod=hashlib.sha256)
- h.update(nonce)
- tmp = h.digest()
-
- def mix_tmp(count):
- nonlocal tmp
- for i in range(count):
- mix_once(tmp)
-
- def mix_twist(nonce):
- nonlocal tmp
- mix_once(base64.urlsafe_b64encode(tmp).rstrip(b'=') + nonce)
-
- mix_once(self._SECRETKEY)
- mix_tmp(time_struct.tm_mon)
- mix_twist(deviceid)
- mix_tmp(time_struct.tm_mday % 5)
- mix_twist(ts_1hour_str)
- mix_tmp(time_struct.tm_hour % 5)
-
- return base64.urlsafe_b64encode(tmp).rstrip(b'=').decode('utf-8')
-
- def _get_device_token(self):
- if self._USERTOKEN:
- return self._USERTOKEN
-
- self._DEVICE_ID = str(uuid.uuid4())
- aks = self._generate_aks(self._DEVICE_ID)
- user_data = self._download_json(
- 'https://api.abema.io/v1/users', None, note='Authorizing',
- data=json.dumps({
- 'deviceId': self._DEVICE_ID,
- 'applicationKeySecret': aks,
- }).encode('utf-8'),
- headers={
- 'Content-Type': 'application/json',
- })
- self._USERTOKEN = user_data['token']
-
- # don't allow adding it 2 times or more, though it's guarded
- remove_opener(self._downloader, AbemaLicenseHandler)
- add_opener(self._downloader, AbemaLicenseHandler(self))
-
- return self._USERTOKEN
-
- def _get_media_token(self, invalidate=False, to_show=True):
- if not invalidate and self._MEDIATOKEN:
- return self._MEDIATOKEN
-
- self._MEDIATOKEN = self._download_json(
- 'https://api.abema.io/v1/media/token', None, note='Fetching media token' if to_show else False,
- query={
- 'osName': 'android',
- 'osVersion': '6.0.1',
- 'osLang': 'ja_JP',
- 'osTimezone': 'Asia/Tokyo',
- 'appId': 'tv.abema',
- 'appVersion': '3.27.1'
- }, headers={
- 'Authorization': 'bearer ' + self._get_device_token()
- })['token']
-
- return self._MEDIATOKEN
def _perform_login(self, username, password):
if '@' in username: # don't strictly check if it's email address or not
@@ -301,13 +311,13 @@ def _perform_login(self, username, password):
method: username,
'password': password
}).encode('utf-8'), headers={
- 'Authorization': 'bearer ' + self._get_device_token(),
+ 'Authorization': f'bearer {self._get_device_token()}',
'Origin': 'https://abema.tv',
'Referer': 'https://abema.tv/',
'Content-Type': 'application/json',
})
- self._USERTOKEN = login_response['token']
+ AbemaTVBaseIE._USERTOKEN = login_response['token']
self._get_media_token(True)
def _real_extract(self, url):
@@ -442,6 +452,7 @@ def _real_extract(self, url):
class AbemaTVTitleIE(AbemaTVBaseIE):
_VALID_URL = r'https?://abema\.tv/video/title/(?P[^?/]+)'
+ _PAGE_SIZE = 25
_TESTS = [{
'url': 'https://abema.tv/video/title/90-1597',
@@ -457,18 +468,39 @@ class AbemaTVTitleIE(AbemaTVBaseIE):
'title': '真心が届く~僕とスターのオフィス・ラブ!?~',
},
'playlist_mincount': 16,
+ }, {
+ 'url': 'https://abema.tv/video/title/25-102',
+ 'info_dict': {
+ 'id': '25-102',
+ 'title': 'ソードアート・オンライン アリシゼーション',
+ },
+ 'playlist_mincount': 24,
}]
+ def _fetch_page(self, playlist_id, series_version, page):
+ programs = self._call_api(
+ f'v1/video/series/{playlist_id}/programs', playlist_id,
+ note=f'Downloading page {page + 1}',
+ query={
+ 'seriesVersion': series_version,
+ 'offset': str(page * self._PAGE_SIZE),
+ 'order': 'seq',
+ 'limit': str(self._PAGE_SIZE),
+ })
+ yield from (
+ self.url_result(f'https://abema.tv/video/episode/{x}')
+ for x in traverse_obj(programs, ('programs', ..., 'id'), default=[]))
+
+ def _entries(self, playlist_id, series_version):
+ return OnDemandPagedList(
+ functools.partial(self._fetch_page, playlist_id, series_version),
+ self._PAGE_SIZE)
+
def _real_extract(self, url):
- video_id = self._match_id(url)
- webpage = self._download_webpage(url, video_id)
+ playlist_id = self._match_id(url)
+ series_info = self._call_api(f'v1/video/series/{playlist_id}', playlist_id)
- playlist_title, breadcrumb = None, self._extract_breadcrumb_list(webpage, video_id)
- if breadcrumb:
- playlist_title = breadcrumb[-1]
-
- playlist = [
- self.url_result(urljoin('https://abema.tv/', mobj.group(1)))
- for mobj in re.finditer(r'