diff --git a/test/test_youtube_signature.py b/test/test_youtube_signature.py index 0ac4fd6028..f1859a2fc6 100644 --- a/test/test_youtube_signature.py +++ b/test/test_youtube_signature.py @@ -106,6 +106,10 @@ 'https://www.youtube.com/s/player/c81bbb4a/player_ias.vflset/en_US/base.js', 'gre3EcLurNY2vqp94', 'Z9DfGxWP115WTg', ), + ( + 'https://www.youtube.com/s/player/1f7d5369/player_ias.vflset/en_US/base.js', + 'batNX7sYqIJdkJ', 'IhOkL_zxbkOZBw', + ), ] diff --git a/yt_dlp/extractor/youtube.py b/yt_dlp/extractor/youtube.py index 12634483e6..795a4f42fa 100644 --- a/yt_dlp/extractor/youtube.py +++ b/yt_dlp/extractor/youtube.py @@ -2652,9 +2652,14 @@ def _extract_n_function(self, video_id, player_url): if self.get_param('youtube_print_sig_code'): self.to_screen(f'Extracted nsig function from {player_id}:\n{func_code[1]}\n') - func = jsi.extract_function_from_code(*func_code) - return lambda s: func([s]) + + def inner(s): + ret = func([s]) + if ret.startswith('enhanced_except_'): + raise ExtractorError('Signature function returned an exception') + return ret + return inner def _extract_signature_timestamp(self, video_id, player_url, ytcfg=None, fatal=False): """ diff --git a/yt_dlp/jsinterp.py b/yt_dlp/jsinterp.py index 47cca11761..d3994e90c2 100644 --- a/yt_dlp/jsinterp.py +++ b/yt_dlp/jsinterp.py @@ -24,9 +24,9 @@ '||': None, '&&': None, - '&': operator.and_, - '|': operator.or_, - '^': operator.xor, + '&': lambda a, b: (a or 0) & (b or 0), + '|': lambda a, b: (a or 0) | (b or 0), + '^': lambda a, b: (a or 0) ^ (b or 0), '===': operator.is_, '!==': operator.is_not, @@ -45,8 +45,8 @@ '-': lambda a, b: (a or 0) - (b or 0), '*': lambda a, b: (a or 0) * (b or 0), - '/': lambda a, b: (a or 0) / b, - '%': operator.mod, + '/': lambda a, b: (a or 0) / b if b else float('NaN'), + '%': lambda a, b: (a or 0) % b if b else float('NaN'), '**': operator.pow, } @@ -54,7 +54,7 @@ _COMP_OPERATORS = {'===', '!==', '==', '!=', '<=', '>=', '<', '>'} _MATCHING_PARENS = dict(zip('({[', ')}]')) -_QUOTES = '\'"' +_QUOTES = '\'"/' def _ternary(cndn, if_true=True, if_false=False): @@ -77,6 +77,12 @@ def __init__(self): ExtractorError.__init__(self, 'Invalid continue') +class JS_Throw(ExtractorError): + def __init__(self, e): + self.error = e + ExtractorError.__init__(self, f'Uncaught exception {e}') + + class LocalNameSpace(collections.ChainMap): def __setitem__(self, key, value): for scope in self.maps: @@ -131,19 +137,24 @@ def _named_object(self, namespace, obj): @staticmethod def _separate(expr, delim=',', max_split=None): + OP_CHARS = '+-*/%&|^=<>!,;' if not expr: return counters = {k: 0 for k in _MATCHING_PARENS.values()} start, splits, pos, delim_len = 0, 0, 0, len(delim) - 1 - in_quote, escaping = None, False + in_quote, escaping, after_op, in_regex_char_group = None, False, True, False for idx, char in enumerate(expr): if not in_quote and char in _MATCHING_PARENS: counters[_MATCHING_PARENS[char]] += 1 elif not in_quote and char in counters: counters[char] -= 1 elif not escaping and char in _QUOTES and in_quote in (char, None): - in_quote = None if in_quote else char + if in_quote or after_op or char != '/': + in_quote = None if in_quote and not in_regex_char_group else char + elif in_quote == '/' and char in '[]': + in_regex_char_group = char == '[' escaping = not escaping and in_quote and char == '\\' + after_op = not in_quote and char in OP_CHARS or (char == ' ' and after_op) if char != delim[pos] or any(counters.values()) or in_quote: pos = 0 @@ -210,16 +221,22 @@ def interpret_statement(self, stmt, local_vars, allow_recursion=100): if should_return: return ret, should_return - m = re.match(r'(?P(?:var|const|let)\s)|return(?:\s+|$)', stmt) + m = re.match(r'(?P(?:var|const|let)\s)|return(?:\s+|(?=["\'])|$)|(?Pthrow\s+)', stmt) if m: expr = stmt[len(m.group(0)):].strip() + if m.group('throw'): + raise JS_Throw(self.interpret_expression(expr, local_vars, allow_recursion)) should_return = not m.group('var') if not expr: return None, should_return if expr[0] in _QUOTES: inner, outer = self._separate(expr, expr[0], 1) - inner = json.loads(js_to_json(f'{inner}{expr[0]}', strict=True)) + if expr[0] == '/': + inner = inner[1:].replace('"', R'\"') + inner = re.compile(json.loads(js_to_json(f'"{inner}"', strict=True))) + else: + inner = json.loads(js_to_json(f'{inner}{expr[0]}', strict=True)) if not outer: return inner, should_return expr = self._named_object(local_vars, inner) + outer @@ -263,21 +280,36 @@ def interpret_statement(self, stmt, local_vars, allow_recursion=100): for item in self._separate(inner)]) expr = name + outer - m = re.match(r'(?Ptry|finally)\s*|(?:(?Pcatch)|(?Pfor)|(?Pswitch))\s*\(', expr) + m = re.match(rf'''(?x) + (?Ptry|finally)\s*| + (?Pcatch\s*(?P\(\s*{_NAME_RE}\s*\)))| + (?Pswitch)\s*\(| + (?Pfor)\s*\(|''', expr) if m and m.group('try'): if expr[m.end()] == '{': try_expr, expr = self._separate_at_paren(expr[m.end():], '}') else: try_expr, expr = expr[m.end() - 1:], '' - ret, should_abort = self.interpret_statement(try_expr, local_vars, allow_recursion) - if should_abort: - return ret, True + try: + ret, should_abort = self.interpret_statement(try_expr, local_vars, allow_recursion) + if should_abort: + return ret, True + except JS_Throw as e: + local_vars['__ytdlp_exception__'] = e.error + except Exception as e: + # XXX: This works for now, but makes debugging future issues very hard + local_vars['__ytdlp_exception__'] = e ret, should_abort = self.interpret_statement(expr, local_vars, allow_recursion) return ret, should_abort or should_return elif m and m.group('catch'): - # We ignore the catch block - _, expr = self._separate_at_paren(expr, '}') + catch_expr, expr = self._separate_at_paren(expr[m.end():], '}') + if '__ytdlp_exception__' in local_vars: + catch_vars = local_vars.new_child({m.group('err'): local_vars.pop('__ytdlp_exception__')}) + ret, should_abort = self.interpret_statement(catch_expr, catch_vars, allow_recursion) + if should_abort: + return ret, True + ret, should_abort = self.interpret_statement(expr, local_vars, allow_recursion) return ret, should_abort or should_return @@ -390,7 +422,7 @@ def interpret_statement(self, stmt, local_vars, allow_recursion=100): raise self.Exception(f'List index {idx} must be integer', expr) idx = int(idx) left_val[idx] = self._operator( - m.group('op'), left_val[idx], m.group('expr'), expr, local_vars, allow_recursion) + m.group('op'), self._index(left_val, idx), m.group('expr'), expr, local_vars, allow_recursion) return left_val[idx], should_return elif expr.isdigit():