diff --git a/devscripts/generate_aes_testdata.py b/devscripts/generate_aes_testdata.py index e3df42cc2da..c26c22a1ee6 100644 --- a/devscripts/generate_aes_testdata.py +++ b/devscripts/generate_aes_testdata.py @@ -7,14 +7,13 @@ import sys sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from youtube_dl.utils import intlist_to_bytes -from youtube_dl.aes import aes_encrypt, key_expansion +from youtube_dl.aes import aes_encrypt secret_msg = b'Secret message goes here' def hex_str(int_list): - return codecs.encode(intlist_to_bytes(int_list), 'hex') + return codecs.encode(int_list, 'hex') def openssl_encode(algo, key, iv): @@ -24,20 +23,20 @@ def openssl_encode(algo, key, iv): return out -iv = key = [0x20, 0x15] + 14 * [0] +iv = key = b' \x15' + b'\x00' * 14 r = openssl_encode('aes-128-cbc', key, iv) print('aes_cbc_decrypt') print(repr(r)) password = key -new_key = aes_encrypt(password, key_expansion(password)) +new_key = aes_encrypt(password, password) r = openssl_encode('aes-128-ctr', new_key, iv) print('aes_decrypt_text 16') print(repr(r)) -password = key + 16 * [0] -new_key = aes_encrypt(password, key_expansion(password)) * (32 // 16) +password = key + b'\x00' * 16 +new_key = aes_encrypt(password, password) * (32 // 16) r = openssl_encode('aes-256-ctr', new_key, iv) print('aes_decrypt_text 32') print(repr(r)) diff --git a/test/test_aes.py b/test/test_aes.py index cc89fb6ab27..3575759375e 100644 --- a/test/test_aes.py +++ b/test/test_aes.py @@ -6,9 +6,10 @@ import os import sys import unittest + sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from youtube_dl.aes import aes_decrypt, aes_encrypt, aes_cbc_decrypt, aes_cbc_encrypt, aes_decrypt_text +from youtube_dl.aes import _aes_decrypt, _aes_encrypt, aes_cbc_decrypt, aes_cbc_encrypt, aes_decrypt_text from youtube_dl.utils import bytes_to_intlist, intlist_to_bytes import base64 @@ -17,45 +18,40 @@ class TestAES(unittest.TestCase): def setUp(self): - self.key = self.iv = [0x20, 0x15] + 14 * [0] + self.key = self.iv = b' \x15\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' self.secret_msg = b'Secret message goes here' def test_encrypt(self): msg = b'message' - key = list(range(16)) - encrypted = aes_encrypt(bytes_to_intlist(msg), key) - decrypted = intlist_to_bytes(aes_decrypt(encrypted, key)) + key = list(range(16)) # this considered already expanded + encrypted = _aes_encrypt(bytes_to_intlist(msg), key) + decrypted = intlist_to_bytes(_aes_decrypt(encrypted, key)) self.assertEqual(decrypted, msg) def test_cbc_decrypt(self): - data = bytes_to_intlist( - b"\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6'\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd" - ) - decrypted = intlist_to_bytes(aes_cbc_decrypt(data, self.key, self.iv)) + data = b"\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6'\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd" + decrypted = aes_cbc_decrypt(data, self.key, self.iv) self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg) def test_cbc_encrypt(self): - data = bytes_to_intlist(self.secret_msg) - encrypted = intlist_to_bytes(aes_cbc_encrypt(data, self.key, self.iv)) + encrypted = aes_cbc_encrypt(self.secret_msg, self.key, self.iv) self.assertEqual( encrypted, b"\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6'\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd") def test_decrypt_text(self): - password = intlist_to_bytes(self.key).decode('utf-8') + password = self.key.decode('utf-8') encrypted = base64.b64encode( - intlist_to_bytes(self.iv[:8]) - + b'\x17\x15\x93\xab\x8d\x80V\xcdV\xe0\t\xcdo\xc2\xa5\xd8ksM\r\xe27N\xae' + self.iv[:8] + b'\x17\x15\x93\xab\x8d\x80V\xcdV\xe0\t\xcdo\xc2\xa5\xd8ksM\r\xe27N\xae' ).decode('utf-8') - decrypted = (aes_decrypt_text(encrypted, password, 16)) + decrypted = aes_decrypt_text(encrypted, password, 16) self.assertEqual(decrypted, self.secret_msg) - password = intlist_to_bytes(self.key).decode('utf-8') + password = self.key.decode('utf-8') encrypted = base64.b64encode( - intlist_to_bytes(self.iv[:8]) - + b'\x0b\xe6\xa4\xd9z\x0e\xb8\xb9\xd0\xd4i_\x85\x1d\x99\x98_\xe5\x80\xe7.\xbf\xa5\x83' + self.iv[:8] + b'\x0b\xe6\xa4\xd9z\x0e\xb8\xb9\xd0\xd4i_\x85\x1d\x99\x98_\xe5\x80\xe7.\xbf\xa5\x83' ).decode('utf-8') - decrypted = (aes_decrypt_text(encrypted, password, 32)) + decrypted = aes_decrypt_text(encrypted, password, 32) self.assertEqual(decrypted, self.secret_msg) diff --git a/youtube_dl/aes.py b/youtube_dl/aes.py index 461bb6d413a..2122ef3862b 100644 --- a/youtube_dl/aes.py +++ b/youtube_dl/aes.py @@ -2,39 +2,70 @@ from math import ceil -from .compat import compat_b64decode +from .compat import compat_b64decode, compat_struct_pack from .utils import bytes_to_intlist, intlist_to_bytes BLOCK_SIZE_BYTES = 16 -def aes_ctr_decrypt(data, key, counter): +def signature_conversion(args_conv=None, kwargs_conv=None, ret_conv=None): + def decorator(function): + def wrapper(*args, **kwargs): + result = function(*args_conv(args) if args_conv else args, + **kwargs_conv(kwargs) if kwargs_conv else kwargs) + return ret_conv(result) if ret_conv else result + return wrapper + return decorator + + +def convert_all_args_to_intlist(arguments): + return map(bytes_to_intlist, arguments) + + +def convert_expanded_key_args(arguments): + data, key = arguments + return [bytes_to_intlist(data), key_expansion(bytes_to_intlist(key))] + + +def _aes_ctr_decrypt(data, key, iv): """ Decrypt with aes in counter mode @param {int[]} data cipher @param {int[]} key 16/24/32-Byte cipher key - @param {instance} counter Instance whose next_value function (@returns {int[]} 16-Byte block) - returns the next counter block + @param {int[]} iv 16-Byte initialization vector @returns {int[]} decrypted data """ + return _aes_ctr_encrypt(data, key, iv) + + +def _aes_ctr_encrypt(data, key, iv): + """ + Encrypt with aes in counter mode + + @param {int[]} data cleartext + @param {int[]} key 16/24/32-Byte cipher key + @param {int[]} iv 16-Byte initialization vector + @returns {int[]} encrypted data + """ expanded_key = key_expansion(key) block_count = int(ceil(float(len(data)) / BLOCK_SIZE_BYTES)) + counter = iter_vector(iv) - decrypted_data = [] + encrypted_data = [] for i in range(block_count): - counter_block = counter.next_value() + counter_block = next(counter) block = data[i * BLOCK_SIZE_BYTES: (i + 1) * BLOCK_SIZE_BYTES] block += [0] * (BLOCK_SIZE_BYTES - len(block)) - cipher_counter_block = aes_encrypt(counter_block, expanded_key) - decrypted_data += xor(block, cipher_counter_block) - decrypted_data = decrypted_data[:len(data)] + cipher_counter_block = _aes_encrypt(counter_block, expanded_key) + encrypted_data += xor(block, cipher_counter_block) + encrypted_data = encrypted_data[:len(data)] - return decrypted_data + return encrypted_data -def aes_cbc_decrypt(data, key, iv): +def _aes_cbc_decrypt(data, key, iv): """ Decrypt with aes in CBC mode @@ -52,7 +83,7 @@ def aes_cbc_decrypt(data, key, iv): block = data[i * BLOCK_SIZE_BYTES: (i + 1) * BLOCK_SIZE_BYTES] block += [0] * (BLOCK_SIZE_BYTES - len(block)) - decrypted_block = aes_decrypt(block, expanded_key) + decrypted_block = _aes_decrypt(block, expanded_key) decrypted_data += xor(decrypted_block, previous_cipher_block) previous_cipher_block = block decrypted_data = decrypted_data[:len(data)] @@ -60,7 +91,7 @@ def aes_cbc_decrypt(data, key, iv): return decrypted_data -def aes_cbc_encrypt(data, key, iv): +def _aes_cbc_encrypt(data, key, iv): """ Encrypt with aes in CBC mode. Using PKCS#7 padding @@ -80,7 +111,7 @@ def aes_cbc_encrypt(data, key, iv): block += [remaining_length] * remaining_length mixed_block = xor(block, previous_cipher_block) - encrypted_block = aes_encrypt(mixed_block, expanded_key) + encrypted_block = _aes_encrypt(mixed_block, expanded_key) encrypted_data += encrypted_block previous_cipher_block = encrypted_block @@ -88,42 +119,67 @@ def aes_cbc_encrypt(data, key, iv): return encrypted_data -def key_expansion(data): +def _aes_gcm_decrypt_and_verify(data, key, tag, nonce): """ - Generate key schedule + Decrypt with aes in GBM mode and checks authenticity using tag - @param {int[]} data 16/24/32-Byte cipher key - @returns {int[]} 176/208/240-Byte expanded key + @param {int[]} data cipher + @param {int[]} key 16-Byte cipher key + @param {int[]} tag authentication tag + @param {int[]} nonce IV (recommended 12-Byte) + @returns {int[]} decrypted data """ - data = data[:] # copy - rcon_iteration = 1 - key_size_bytes = len(data) - expanded_key_size_bytes = (key_size_bytes // 4 + 7) * BLOCK_SIZE_BYTES + # XXX: check aes, gcm param + hash_subkey = _aes_encrypt([0] * BLOCK_SIZE_BYTES, key_expansion(key)) + + if len(nonce) == 12: + j0 = nonce + [0, 0, 0, 1] + else: + fill = (BLOCK_SIZE_BYTES - (len(nonce) % BLOCK_SIZE_BYTES)) % BLOCK_SIZE_BYTES + 8 + ghash_in = nonce + [0] * fill + bytes_to_intlist(compat_struct_pack('B', 8 * len(nonce))) + j0 = ghash(hash_subkey, ghash_in) + + # nonce_ctr = j0[:12] + iv_ctr = inc(j0) + + decrypted_data = _aes_ctr_decrypt(data, key, iv_ctr + [0] * (BLOCK_SIZE_BYTES - len(iv_ctr))) + pad_len = len(data) // BLOCK_SIZE_BYTES * BLOCK_SIZE_BYTES + s_tag = ghash( + hash_subkey, + data + + [0] * (BLOCK_SIZE_BYTES - len(data) + pad_len) # pad + + bytes_to_intlist(compat_struct_pack('B', 0 * 8) # length of associated data + + (compat_struct_pack('B', len(data) * 8))) # length of data + ) + + if tag != _aes_ctr_encrypt(s_tag, key, j0): + raise ValueError("Mismatching authentication tag") - while len(data) < expanded_key_size_bytes: - temp = data[-4:] - temp = key_schedule_core(temp, rcon_iteration) - rcon_iteration += 1 - data += xor(temp, data[-key_size_bytes: 4 - key_size_bytes]) + return decrypted_data - for _ in range(3): - temp = data[-4:] - data += xor(temp, data[-key_size_bytes: 4 - key_size_bytes]) - if key_size_bytes == 32: - temp = data[-4:] - temp = sub_bytes(temp) - data += xor(temp, data[-key_size_bytes: 4 - key_size_bytes]) +def _aes_decrypt(data, expanded_key): + """ + Decrypt one block with aes - for _ in range(3 if key_size_bytes == 32 else 2 if key_size_bytes == 24 else 0): - temp = data[-4:] - data += xor(temp, data[-key_size_bytes: 4 - key_size_bytes]) - data = data[:expanded_key_size_bytes] + @param {int[]} data 16-Byte cipher + @param {int[]} expanded_key 176/208/240-Byte expanded key + @returns {int[]} 16-Byte state + """ + rounds = len(expanded_key) // BLOCK_SIZE_BYTES - 1 + + for i in range(rounds, 0, -1): + data = xor(data, expanded_key[i * BLOCK_SIZE_BYTES: (i + 1) * BLOCK_SIZE_BYTES]) + if i != rounds: + data = list(iter_mix_columns(data, MIX_COLUMN_MATRIX_INV)) + data = shift_rows_inv(data) + data = sub_bytes_inv(data) + data = xor(data, expanded_key[:BLOCK_SIZE_BYTES]) return data -def aes_encrypt(data, expanded_key): +def _aes_encrypt(data, expanded_key): """ Encrypt one block with aes @@ -138,33 +194,12 @@ def aes_encrypt(data, expanded_key): data = sub_bytes(data) data = shift_rows(data) if i != rounds: - data = mix_columns(data) + data = list(iter_mix_columns(data, MIX_COLUMN_MATRIX)) data = xor(data, expanded_key[i * BLOCK_SIZE_BYTES: (i + 1) * BLOCK_SIZE_BYTES]) return data -def aes_decrypt(data, expanded_key): - """ - Decrypt one block with aes - - @param {int[]} data 16-Byte cipher - @param {int[]} expanded_key 176/208/240-Byte expanded key - @returns {int[]} 16-Byte state - """ - rounds = len(expanded_key) // BLOCK_SIZE_BYTES - 1 - - for i in range(rounds, 0, -1): - data = xor(data, expanded_key[i * BLOCK_SIZE_BYTES: (i + 1) * BLOCK_SIZE_BYTES]) - if i != rounds: - data = mix_columns_inv(data) - data = shift_rows_inv(data) - data = sub_bytes_inv(data) - data = xor(data, expanded_key[:BLOCK_SIZE_BYTES]) - - return data - - def aes_decrypt_text(data, password, key_size_bytes): """ Decrypt text @@ -180,27 +215,53 @@ def aes_decrypt_text(data, password, key_size_bytes): """ NONCE_LENGTH_BYTES = 8 - data = bytes_to_intlist(compat_b64decode(data)) - password = bytes_to_intlist(password.encode('utf-8')) + data = compat_b64decode(data) + password = password.encode('utf-8') - key = password[:key_size_bytes] + [0] * (key_size_bytes - len(password)) - key = aes_encrypt(key[:BLOCK_SIZE_BYTES], key_expansion(key)) * (key_size_bytes // BLOCK_SIZE_BYTES) + key = password[:key_size_bytes] + b'\x00' * (key_size_bytes - len(password)) + key = aes_encrypt(key[:BLOCK_SIZE_BYTES], key) * (key_size_bytes // BLOCK_SIZE_BYTES) nonce = data[:NONCE_LENGTH_BYTES] cipher = data[NONCE_LENGTH_BYTES:] - class Counter(object): - __value = nonce + [0] * (BLOCK_SIZE_BYTES - NONCE_LENGTH_BYTES) + decrypted_data = aes_ctr_decrypt(cipher, key, nonce + b'\x00' * (BLOCK_SIZE_BYTES - NONCE_LENGTH_BYTES)) + + return decrypted_data + + +@signature_conversion(convert_all_args_to_intlist, None, intlist_to_bytes) +def aes_ctr_decrypt(data, key, iv): + return _aes_ctr_decrypt(data, key, iv) + + +@signature_conversion(convert_all_args_to_intlist, None, intlist_to_bytes) +def aes_ctr_encrypt(data, key, iv): + return _aes_ctr_encrypt(data, key, iv) + + +@signature_conversion(convert_all_args_to_intlist, None, intlist_to_bytes) +def aes_cbc_decrypt(data, key, iv): + return _aes_cbc_decrypt(data, key, iv) + + +@signature_conversion(convert_all_args_to_intlist, None, intlist_to_bytes) +def aes_cbc_encrypt(data, key, iv): + return _aes_cbc_encrypt(data, key, iv) + + +@signature_conversion(convert_all_args_to_intlist, None, intlist_to_bytes) +def aes_gcm_decrypt_and_verify(data, key, tag, nonce): + return _aes_gcm_decrypt_and_verify(data, key, tag, nonce) + - def next_value(self): - temp = self.__value - self.__value = inc(self.__value) - return temp +@signature_conversion(convert_expanded_key_args, None, intlist_to_bytes) +def aes_decrypt(data, key): + return _aes_decrypt(data, key) - decrypted_data = aes_ctr_decrypt(cipher, key, Counter()) - plaintext = intlist_to_bytes(decrypted_data) - return plaintext +@signature_conversion(convert_expanded_key_args, None, intlist_to_bytes) +def aes_encrypt(data, key): + return _aes_encrypt(data, key) RCON = (0x8d, 0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80, 0x1b, 0x36) @@ -278,6 +339,47 @@ def next_value(self): 0x67, 0x4a, 0xed, 0xde, 0xc5, 0x31, 0xfe, 0x18, 0x0d, 0x63, 0x8c, 0x80, 0xc0, 0xf7, 0x70, 0x07) +def key_expansion(data): + """ + Generate key schedule + + @param {int[]} data 16/24/32-Byte cipher key + @returns {int[]} 176/208/240-Byte expanded key + """ + data = data[:] # copy + rcon_iteration = 1 + key_size_bytes = len(data) + expanded_key_size_bytes = (key_size_bytes // 4 + 7) * BLOCK_SIZE_BYTES + + while len(data) < expanded_key_size_bytes: + temp = data[-4:] + temp = key_schedule_core(temp, rcon_iteration) + rcon_iteration += 1 + data += xor(temp, data[-key_size_bytes: 4 - key_size_bytes]) + + for _ in range(3): + temp = data[-4:] + data += xor(temp, data[-key_size_bytes: 4 - key_size_bytes]) + + if key_size_bytes == 32: + temp = data[-4:] + temp = sub_bytes(temp) + data += xor(temp, data[-key_size_bytes: 4 - key_size_bytes]) + + for _ in range(3 if key_size_bytes == 32 else 2 if key_size_bytes == 24 else 0): + temp = data[-4:] + data += xor(temp, data[-key_size_bytes: 4 - key_size_bytes]) + data = data[:expanded_key_size_bytes] + + return data + + +def iter_vector(iv): + while True: + yield iv + iv = inc(iv) + + def sub_bytes(data): return [SBOX[x] for x in data] @@ -302,48 +404,42 @@ def xor(data1, data2): return [x ^ y for x, y in zip(data1, data2)] -def rijndael_mul(a, b): - if(a == 0 or b == 0): - return 0 - return RIJNDAEL_EXP_TABLE[(RIJNDAEL_LOG_TABLE[a] + RIJNDAEL_LOG_TABLE[b]) % 0xFF] - - -def mix_column(data, matrix): - data_mixed = [] - for row in range(4): - mixed = 0 - for column in range(4): +def iter_mix_columns(data, matrix): + for i in (0, 4, 8, 12): + d0, d1, d2, d3 = data[i: i + 4] + for row in matrix: + c0, c1, c2, c3 = row # xor is (+) and (-) - mixed ^= rijndael_mul(data[column], matrix[row][column]) - data_mixed.append(mixed) - return data_mixed + v0 = (0 if d0 == 0 or c0 == 0 else + RIJNDAEL_EXP_TABLE[(RIJNDAEL_LOG_TABLE[d0] + RIJNDAEL_LOG_TABLE[c0]) % 0xFF]) + v1 = (0 if d1 == 0 or c1 == 0 else + RIJNDAEL_EXP_TABLE[(RIJNDAEL_LOG_TABLE[d1] + RIJNDAEL_LOG_TABLE[c1]) % 0xFF]) + v2 = (0 if d2 == 0 or c2 == 0 else + RIJNDAEL_EXP_TABLE[(RIJNDAEL_LOG_TABLE[d2] + RIJNDAEL_LOG_TABLE[c2]) % 0xFF]) + v3 = (0 if d3 == 0 or c3 == 0 else + RIJNDAEL_EXP_TABLE[(RIJNDAEL_LOG_TABLE[d3] + RIJNDAEL_LOG_TABLE[c3]) % 0xFF]) + yield v0 ^ v1 ^ v2 ^ v3 -def mix_columns(data, matrix=MIX_COLUMN_MATRIX): - data_mixed = [] - for i in range(4): - column = data[i * 4: (i + 1) * 4] - data_mixed += mix_column(column, matrix) - return data_mixed +def shift_rows(data): + return [data[((column + row) & 0b11) * 4 + row] for column in range(4) for row in range(4)] -def mix_columns_inv(data): - return mix_columns(data, MIX_COLUMN_MATRIX_INV) +def shift_rows_inv(data): + return [data[((column - row) & 0b11) * 4 + row] for column in range(4) for row in range(4)] -def shift_rows(data): +def shift_block(data): data_shifted = [] - for column in range(4): - for row in range(4): - data_shifted.append(data[((column + row) & 0b11) * 4 + row]) - return data_shifted + bit = 0 + for n in data: + if bit: + n |= 0x100 + bit = n & 1 + n >>= 1 + data_shifted.append(n) -def shift_rows_inv(data): - data_shifted = [] - for column in range(4): - for row in range(4): - data_shifted.append(data[((column - row) & 0b11) * 4 + row]) return data_shifted @@ -358,4 +454,49 @@ def inc(data): return data -__all__ = ['aes_encrypt', 'key_expansion', 'aes_ctr_decrypt', 'aes_cbc_decrypt', 'aes_decrypt_text'] +def block_product(block_x, block_y): + # NIST SP 800-38D, Algorithm 1 + + if len(block_x) != BLOCK_SIZE_BYTES or len(block_y) != BLOCK_SIZE_BYTES: + raise ValueError("Length of blocks need to be %d bytes" % BLOCK_SIZE_BYTES) + + block_r = [0xE1] + [0] * (BLOCK_SIZE_BYTES - 1) + block_v = block_y[:] + block_z = [0] * BLOCK_SIZE_BYTES + + for i in block_x: + for bit in range(7, -1, -1): + if i & (1 << bit): + block_z = xor(block_z, block_v) + + do_xor = block_v[-1] & 1 + block_v = shift_block(block_v) + if do_xor: + block_v = xor(block_v, block_r) + + return block_z + + +def ghash(subkey, data): + # NIST SP 800-38D, Algorithm 2 + + if len(data) % BLOCK_SIZE_BYTES: + raise ValueError("Length of data should be %d bytes" % BLOCK_SIZE_BYTES) + + last_y = [0] * BLOCK_SIZE_BYTES + for i in range(0, len(data), BLOCK_SIZE_BYTES): + block = data[i: i + BLOCK_SIZE_BYTES] # noqa: E203 + last_y = block_product(xor(last_y, block), subkey) + + return last_y + + +__all__ = [ + 'aes_cbc_encrypt', + 'aes_cbc_decrypt', + 'aes_ctr_encrypt', + 'aes_decrypt', + 'aes_encrypt', + 'aes_gcm_decrypt_and_verify', + 'key_expansion' +] diff --git a/youtube_dl/extractor/adn.py b/youtube_dl/extractor/adn.py index a55ebbcbd68..cc7626a3d9e 100644 --- a/youtube_dl/extractor/adn.py +++ b/youtube_dl/extractor/adn.py @@ -87,11 +87,11 @@ def _get_subtitles(self, sub_url, video_id): return None # http://animedigitalnetwork.fr/components/com_vodvideo/videojs/adn-vjs.min.js - dec_subtitles = intlist_to_bytes(aes_cbc_decrypt( - bytes_to_intlist(compat_b64decode(enc_subtitles[24:])), - bytes_to_intlist(binascii.unhexlify(self._K + 'ab9f52f5baae7c72')), - bytes_to_intlist(compat_b64decode(enc_subtitles[:24])) - )) + dec_subtitles = aes_cbc_decrypt( + compat_b64decode(enc_subtitles[24:]), + binascii.unhexlify(self._K + 'ab9f52f5baae7c72'), + compat_b64decode(enc_subtitles[:24]) + ) subtitles_json = self._parse_json( dec_subtitles[:-compat_ord(dec_subtitles[-1])].decode(), None, fatal=False) diff --git a/youtube_dl/extractor/anvato.py b/youtube_dl/extractor/anvato.py index b7398563b35..908c687f519 100644 --- a/youtube_dl/extractor/anvato.py +++ b/youtube_dl/extractor/anvato.py @@ -12,9 +12,7 @@ from ..aes import aes_encrypt from ..compat import compat_str from ..utils import ( - bytes_to_intlist, determine_ext, - intlist_to_bytes, int_or_none, strip_jsonp, unescapeHTML, @@ -253,8 +251,7 @@ def _get_video_json(self, access_key, video_id): server_time = self._server_time(access_key, video_id) input_data = '%d~%s~%s' % (server_time, md5_text(video_data_url), md5_text(server_time)) - auth_secret = intlist_to_bytes(aes_encrypt( - bytes_to_intlist(input_data[:64]), bytes_to_intlist(self._AUTH_KEY))) + auth_secret = aes_encrypt(input_data[:64], self._AUTH_KEY) video_data_url += '&X-Anvato-Adst-Auth=' + base64.b64encode(auth_secret).decode('ascii') anvrid = md5_text(time.time() * 1000 * random.random())[:30] diff --git a/youtube_dl/extractor/crunchyroll.py b/youtube_dl/extractor/crunchyroll.py index bc2d1fa8b04..8bbfa197b35 100644 --- a/youtube_dl/extractor/crunchyroll.py +++ b/youtube_dl/extractor/crunchyroll.py @@ -20,7 +20,6 @@ ) from ..utils import ( ExtractorError, - bytes_to_intlist, extract_attributes, float_or_none, intlist_to_bytes, @@ -277,8 +276,8 @@ def _download_webpage(self, url_or_request, *args, **kwargs): return super(CrunchyrollBaseIE, self)._download_webpage(request, *args, **kwargs) def _decrypt_subtitles(self, data, iv, id): - data = bytes_to_intlist(compat_b64decode(data)) - iv = bytes_to_intlist(compat_b64decode(iv)) + data = compat_b64decode(data) + iv = compat_b64decode(iv) id = int(id) def obfuscate_key_aux(count, modulo, start): @@ -296,13 +295,13 @@ def obfuscate_key(key): num3 = key ^ num1 num4 = num3 ^ (num3 >> 3) ^ num2 prefix = intlist_to_bytes(obfuscate_key_aux(20, 97, (1, 2))) - shaHash = bytes_to_intlist(sha1(prefix + str(num4).encode('ascii')).digest()) + shaHash = sha1(prefix + str(num4).encode('ascii')).digest() # Extend 160 Bit hash to 256 Bit - return shaHash + [0] * 12 + return shaHash + b'\x00' * 12 key = obfuscate_key(id) - decrypted_data = intlist_to_bytes(aes_cbc_decrypt(data, key, iv)) + decrypted_data = aes_cbc_decrypt(data, key, iv) return zlib.decompress(decrypted_data) def _convert_subtitles_to_srt(self, sub_root): diff --git a/youtube_dl/extractor/drtv.py b/youtube_dl/extractor/drtv.py index c0036adb619..72aa6dfb8d3 100644 --- a/youtube_dl/extractor/drtv.py +++ b/youtube_dl/extractor/drtv.py @@ -10,10 +10,8 @@ from ..aes import aes_cbc_decrypt from ..compat import compat_urllib_parse_unquote from ..utils import ( - bytes_to_intlist, ExtractorError, int_or_none, - intlist_to_bytes, float_or_none, mimetype2ext, str_or_none, @@ -191,13 +189,12 @@ def hex_to_bytes(hex): def decrypt_uri(e): n = int(e[2:10], 16) a = e[10 + n:] - data = bytes_to_intlist(hex_to_bytes(e[10:10 + n])) - key = bytes_to_intlist(hashlib.sha256( - ('%s:sRBzYNXBzkKgnjj8pGtkACch' % a).encode('utf-8')).digest()) - iv = bytes_to_intlist(hex_to_bytes(a)) + data = hex_to_bytes(e[10:10 + n]) + key = hashlib.sha256( + ('%s:sRBzYNXBzkKgnjj8pGtkACch' % a).encode('utf-8')).digest() + iv = hex_to_bytes(a) decrypted = aes_cbc_decrypt(data, key, iv) - return intlist_to_bytes( - decrypted[:-decrypted[-1]]).decode('utf-8').split('?')[0] + return decrypted[:-ord(decrypted[-1])].decode('utf-8').split('?')[0] for asset in assets: kind = asset.get('Kind') diff --git a/youtube_dl/extractor/newstube.py b/youtube_dl/extractor/newstube.py index dab4aec44a2..c203ca9ae91 100644 --- a/youtube_dl/extractor/newstube.py +++ b/youtube_dl/extractor/newstube.py @@ -7,9 +7,7 @@ from .common import InfoExtractor from ..aes import aes_cbc_decrypt from ..utils import ( - bytes_to_intlist, int_or_none, - intlist_to_bytes, parse_codecs, parse_duration, ) @@ -47,10 +45,8 @@ def _real_extract(self, url): })) key = hashlib.pbkdf2_hmac( 'sha1', video_guid.replace('-', '').encode(), enc_data[:16], 1)[:16] - dec_data = aes_cbc_decrypt( - bytes_to_intlist(enc_data[32:]), bytes_to_intlist(key), - bytes_to_intlist(enc_data[16:32])) - sources = self._parse_json(intlist_to_bytes(dec_data[:-dec_data[-1]]), video_guid) + dec_data = aes_cbc_decrypt(enc_data[32:], key, enc_data[16:32]) + sources = self._parse_json(dec_data[:-ord(dec_data[-1])], video_guid) formats = [] for source in sources: diff --git a/youtube_dl/extractor/rtl2.py b/youtube_dl/extractor/rtl2.py index 70f000ca8a7..7924e6f0b32 100644 --- a/youtube_dl/extractor/rtl2.py +++ b/youtube_dl/extractor/rtl2.py @@ -11,9 +11,7 @@ compat_str, ) from ..utils import ( - bytes_to_intlist, ExtractorError, - intlist_to_bytes, int_or_none, strip_or_none, ) @@ -142,11 +140,7 @@ def _real_extract(self, url): self._BACKWERK_BASE_URL + 'stream/video/' + video_id, video_id) data, iv = compat_b64decode(stream_data['streamUrl']).decode().split(':') - stream_url = intlist_to_bytes(aes_cbc_decrypt( - bytes_to_intlist(compat_b64decode(data)), - bytes_to_intlist(self._AES_KEY), - bytes_to_intlist(compat_b64decode(iv)) - )) + stream_url = aes_cbc_decrypt(compat_b64decode(data), self._AES_KEY, compat_b64decode(iv)) if b'rtl2_you_video_not_found' in stream_url: raise ExtractorError('video not found', expected=True)