From 22b5e395663ed76ee04018a75801d4be3e660e8b Mon Sep 17 00:00:00 2001 From: Mark Adams Date: Fri, 5 Aug 2016 23:03:18 -0500 Subject: [PATCH] Replaced tests.utils ensure_unicode and ensure_bytes with jwt.utils versions --- tests/contrib/test_algorithms.py | 36 +++++++++++----------- tests/keys/__init__.py | 8 ++--- tests/test_algorithms.py | 52 ++++++++++++++++---------------- tests/test_api_jws.py | 37 +++++++++++------------ tests/test_compat.py | 9 +++--- tests/utils.py | 16 ---------- 6 files changed, 71 insertions(+), 87 deletions(-) diff --git a/tests/contrib/test_algorithms.py b/tests/contrib/test_algorithms.py index 1fbe10d19..6d5ca75bd 100644 --- a/tests/contrib/test_algorithms.py +++ b/tests/contrib/test_algorithms.py @@ -1,8 +1,10 @@ import base64 +from jwt.utils import force_bytes, force_unicode + import pytest -from ..utils import ensure_bytes, ensure_unicode, key_path +from ..utils import key_path try: from jwt.contrib.algorithms.pycrypto import RSAAlgorithm @@ -29,7 +31,7 @@ def test_rsa_should_accept_unicode_key(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) with open(key_path('testkey_rsa'), 'r') as rsa_key: - algo.prepare_key(ensure_unicode(rsa_key.read())) + algo.prepare_key(force_unicode(rsa_key.read())) def test_rsa_should_reject_non_string_key(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) @@ -40,9 +42,9 @@ def test_rsa_should_reject_non_string_key(self): def test_rsa_sign_should_generate_correct_signature_value(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - jwt_message = ensure_bytes('Hello World!') + jwt_message = force_bytes('Hello World!') - expected_sig = base64.b64decode(ensure_bytes( + expected_sig = base64.b64decode(force_bytes( 'yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp' '10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl' '2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix' @@ -63,9 +65,9 @@ def test_rsa_sign_should_generate_correct_signature_value(self): def test_rsa_verify_should_return_false_if_signature_invalid(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - jwt_message = ensure_bytes('Hello World!') + jwt_message = force_bytes('Hello World!') - jwt_sig = base64.b64decode(ensure_bytes( + jwt_sig = base64.b64decode(force_bytes( 'yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp' '10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl' '2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix' @@ -73,7 +75,7 @@ def test_rsa_verify_should_return_false_if_signature_invalid(self): 'fHJnNUzAEUOXS0WahHVb57D30pcgIji9z923q90p5c7E2cU8V+E1qe8NdCA' 'APCDzZZ9zQ/dgcMVaBrGrgimrcLbPjueOKFgSO+SSjIElKA==')) - jwt_sig += ensure_bytes('123') # Signature is now invalid + jwt_sig += force_bytes('123') # Signature is now invalid with open(key_path('testkey_rsa.pub'), 'r') as keyfile: jwt_pub_key = algo.prepare_key(keyfile.read()) @@ -84,9 +86,9 @@ def test_rsa_verify_should_return_false_if_signature_invalid(self): def test_rsa_verify_should_return_true_if_signature_valid(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - jwt_message = ensure_bytes('Hello World!') + jwt_message = force_bytes('Hello World!') - jwt_sig = base64.b64decode(ensure_bytes( + jwt_sig = base64.b64decode(force_bytes( 'yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp' '10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl' '2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix' @@ -122,14 +124,14 @@ def test_ec_should_accept_unicode_key(self): algo = ECAlgorithm(ECAlgorithm.SHA256) with open(key_path('testkey_ec'), 'r') as ec_key: - algo.prepare_key(ensure_unicode(ec_key.read())) + algo.prepare_key(force_unicode(ec_key.read())) def test_ec_sign_should_generate_correct_signature_value(self): algo = ECAlgorithm(ECAlgorithm.SHA256) - jwt_message = ensure_bytes('Hello World!') + jwt_message = force_bytes('Hello World!') - expected_sig = base64.b64decode(ensure_bytes( + expected_sig = base64.b64decode(force_bytes( 'AC+m4Jf/xI3guAC6w0w37t5zRpSCF6F4udEz5LiMiTIjCS4vcVe6dDOxK+M' 'mvkF8PxJuvqxP2CO3TR3okDPCl/NjATTO1jE+qBZ966CRQSSzcCM+tzcHzw' 'LZS5kbvKu0Acd/K6Ol2/W3B1NeV5F/gjvZn/jOwaLgWEUYsg0o4XVrAg65')) @@ -147,14 +149,14 @@ def test_ec_sign_should_generate_correct_signature_value(self): def test_ec_verify_should_return_false_if_signature_invalid(self): algo = ECAlgorithm(ECAlgorithm.SHA256) - jwt_message = ensure_bytes('Hello World!') + jwt_message = force_bytes('Hello World!') - jwt_sig = base64.b64decode(ensure_bytes( + jwt_sig = base64.b64decode(force_bytes( 'AC+m4Jf/xI3guAC6w0w37t5zRpSCF6F4udEz5LiMiTIjCS4vcVe6dDOxK+M' 'mvkF8PxJuvqxP2CO3TR3okDPCl/NjATTO1jE+qBZ966CRQSSzcCM+tzcHzw' 'LZS5kbvKu0Acd/K6Ol2/W3B1NeV5F/gjvZn/jOwaLgWEUYsg0o4XVrAg65')) - jwt_sig += ensure_bytes('123') # Signature is now invalid + jwt_sig += force_bytes('123') # Signature is now invalid with open(key_path('testkey_ec.pub'), 'r') as keyfile: jwt_pub_key = algo.prepare_key(keyfile.read()) @@ -165,9 +167,9 @@ def test_ec_verify_should_return_false_if_signature_invalid(self): def test_ec_verify_should_return_true_if_signature_valid(self): algo = ECAlgorithm(ECAlgorithm.SHA256) - jwt_message = ensure_bytes('Hello World!') + jwt_message = force_bytes('Hello World!') - jwt_sig = base64.b64decode(ensure_bytes( + jwt_sig = base64.b64decode(force_bytes( 'AC+m4Jf/xI3guAC6w0w37t5zRpSCF6F4udEz5LiMiTIjCS4vcVe6dDOxK+M' 'mvkF8PxJuvqxP2CO3TR3okDPCl/NjATTO1jE+qBZ966CRQSSzcCM+tzcHzw' 'LZS5kbvKu0Acd/K6Ol2/W3B1NeV5F/gjvZn/jOwaLgWEUYsg0o4XVrAg65')) diff --git a/tests/keys/__init__.py b/tests/keys/__init__.py index 47bce70b3..4fae68780 100644 --- a/tests/keys/__init__.py +++ b/tests/keys/__init__.py @@ -1,15 +1,15 @@ import json import os -from jwt.utils import base64url_decode +from jwt.utils import base64url_decode, force_bytes -from tests.utils import ensure_bytes, int_from_bytes +from tests.utils import int_from_bytes BASE_PATH = os.path.dirname(os.path.abspath(__file__)) def decode_value(val): - decoded = base64url_decode(ensure_bytes(val)) + decoded = base64url_decode(force_bytes(val)) return int_from_bytes(decoded, 'big') @@ -17,7 +17,7 @@ def load_hmac_key(): with open(os.path.join(BASE_PATH, 'jwk_hmac.json'), 'r') as infile: keyobj = json.load(infile) - return base64url_decode(ensure_bytes(keyobj['k'])) + return base64url_decode(force_bytes(keyobj['k'])) try: from cryptography.hazmat.primitives.asymmetric import ec diff --git a/tests/test_algorithms.py b/tests/test_algorithms.py index e44d405ec..2e626ff03 100644 --- a/tests/test_algorithms.py +++ b/tests/test_algorithms.py @@ -3,12 +3,12 @@ from jwt.algorithms import Algorithm, HMACAlgorithm, NoneAlgorithm from jwt.exceptions import InvalidKeyError -from jwt.utils import base64url_decode +from jwt.utils import base64url_decode, force_bytes, force_unicode import pytest from .keys import load_hmac_key -from .utils import ensure_bytes, ensure_unicode, key_path +from .utils import key_path try: from jwt.algorithms import RSAAlgorithm, ECAlgorithm, RSAPSSAlgorithm @@ -67,7 +67,7 @@ def test_hmac_should_reject_nonstring_key(self): def test_hmac_should_accept_unicode_key(self): algo = HMACAlgorithm(HMACAlgorithm.SHA256) - algo.prepare_key(ensure_unicode('awesome')) + algo.prepare_key(force_unicode('awesome')) def test_hmac_should_throw_exception_if_key_is_pem_public_key(self): algo = HMACAlgorithm(HMACAlgorithm.SHA256) @@ -131,7 +131,7 @@ def test_rsa_should_accept_unicode_key(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) with open(key_path('testkey_rsa'), 'r') as rsa_key: - algo.prepare_key(ensure_unicode(rsa_key.read())) + algo.prepare_key(force_unicode(rsa_key.read())) @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') def test_rsa_should_reject_non_string_key(self): @@ -144,9 +144,9 @@ def test_rsa_should_reject_non_string_key(self): def test_rsa_verify_should_return_false_if_signature_invalid(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - message = ensure_bytes('Hello World!') + message = force_bytes('Hello World!') - sig = base64.b64decode(ensure_bytes( + sig = base64.b64decode(force_bytes( 'yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp' '10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl' '2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix' @@ -154,7 +154,7 @@ def test_rsa_verify_should_return_false_if_signature_invalid(self): 'fHJnNUzAEUOXS0WahHVb57D30pcgIji9z923q90p5c7E2cU8V+E1qe8NdCA' 'APCDzZZ9zQ/dgcMVaBrGrgimrcLbPjueOKFgSO+SSjIElKA==')) - sig += ensure_bytes('123') # Signature is now invalid + sig += force_bytes('123') # Signature is now invalid with open(key_path('testkey_rsa.pub'), 'r') as keyfile: pub_key = algo.prepare_key(keyfile.read()) @@ -172,8 +172,8 @@ def test_rsa_jwk_public_and_private_keys_should_parse_and_verify(self): with open(key_path('jwk_rsa_key.json'), 'r') as keyfile: priv_key = algo.from_jwk(keyfile.read()) - signature = algo.sign(ensure_bytes('Hello World!'), priv_key) - assert algo.verify(ensure_bytes('Hello World!'), pub_key, signature) + signature = algo.sign(force_bytes('Hello World!'), priv_key) + assert algo.verify(force_bytes('Hello World!'), pub_key, signature) @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') def test_rsa_jwk_private_key_with_other_primes_is_invalid(self): @@ -330,16 +330,16 @@ def test_ec_should_accept_unicode_key(self): algo = ECAlgorithm(ECAlgorithm.SHA256) with open(key_path('testkey_ec'), 'r') as ec_key: - algo.prepare_key(ensure_unicode(ec_key.read())) + algo.prepare_key(force_unicode(ec_key.read())) @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') def test_ec_verify_should_return_false_if_signature_invalid(self): algo = ECAlgorithm(ECAlgorithm.SHA256) - message = ensure_bytes('Hello World!') + message = force_bytes('Hello World!') # Mess up the signature by replacing a known byte - sig = base64.b64decode(ensure_bytes( + sig = base64.b64decode(force_bytes( 'AC+m4Jf/xI3guAC6w0w37t5zRpSCF6F4udEz5LiMiTIjCS4vcVe6dDOxK+M' 'mvkF8PxJuvqxP2CO3TR3okDPCl/NjATTO1jE+qBZ966CRQSSzcCM+tzcHzw' 'LZS5kbvKu0Acd/K6Ol2/W3B1NeV5F/gjvZn/jOwaLgWEUYsg0o4XVrAg65'.replace('r', 's'))) @@ -354,9 +354,9 @@ def test_ec_verify_should_return_false_if_signature_invalid(self): def test_ec_verify_should_return_false_if_signature_wrong_length(self): algo = ECAlgorithm(ECAlgorithm.SHA256) - message = ensure_bytes('Hello World!') + message = force_bytes('Hello World!') - sig = base64.b64decode(ensure_bytes('AC+m4Jf/xI3guAC6w0w3')) + sig = base64.b64decode(force_bytes('AC+m4Jf/xI3guAC6w0w3')) with open(key_path('testkey_ec.pub'), 'r') as keyfile: pub_key = algo.prepare_key(keyfile.read()) @@ -368,7 +368,7 @@ def test_ec_verify_should_return_false_if_signature_wrong_length(self): def test_rsa_pss_sign_then_verify_should_return_true(self): algo = RSAPSSAlgorithm(RSAPSSAlgorithm.SHA256) - message = ensure_bytes('Hello World!') + message = force_bytes('Hello World!') with open(key_path('testkey_rsa'), 'r') as keyfile: priv_key = algo.prepare_key(keyfile.read()) @@ -384,9 +384,9 @@ def test_rsa_pss_sign_then_verify_should_return_true(self): def test_rsa_pss_verify_should_return_false_if_signature_invalid(self): algo = RSAPSSAlgorithm(RSAPSSAlgorithm.SHA256) - jwt_message = ensure_bytes('Hello World!') + jwt_message = force_bytes('Hello World!') - jwt_sig = base64.b64decode(ensure_bytes( + jwt_sig = base64.b64decode(force_bytes( 'ywKAUGRIDC//6X+tjvZA96yEtMqpOrSppCNfYI7NKyon3P7doud5v65oWNu' 'vQsz0fzPGfF7mQFGo9Cm9Vn0nljm4G6PtqZRbz5fXNQBH9k10gq34AtM02c' '/cveqACQ8gF3zxWh6qr9jVqIpeMEaEBIkvqG954E0HT9s9ybHShgHX9mlWk' @@ -394,7 +394,7 @@ def test_rsa_pss_verify_should_return_false_if_signature_invalid(self): 'daOCWqbpZDuLb1imKpmm8Nsm56kAxijMLZnpCcnPgyb7CqG+B93W9GHglA5' 'drUeR1gRtO7vqbZMsCAQ4bpjXxwbYyjQlEVuMl73UL6sOWg==')) - jwt_sig += ensure_bytes('123') # Signature is now invalid + jwt_sig += force_bytes('123') # Signature is now invalid with open(key_path('testkey_rsa.pub'), 'r') as keyfile: jwt_pub_key = algo.prepare_key(keyfile.read()) @@ -416,7 +416,7 @@ def test_hmac_verify_should_return_true_for_test_vector(self): Reference: https://tools.ietf.org/html/rfc7520#section-4.4 """ - signing_input = ensure_bytes( + signing_input = force_bytes( 'eyJhbGciOiJIUzI1NiIsImtpZCI6IjAxOGMwYWU1LTRkOWItNDcxYi1iZmQ2LWVlZ' 'jMxNGJjNzAzNyJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ' '29pbmcgb3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIG' @@ -424,7 +424,7 @@ def test_hmac_verify_should_return_true_for_test_vector(self): 'gd2hlcmUgeW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4' ) - signature = base64url_decode(ensure_bytes( + signature = base64url_decode(force_bytes( 's0h6KThzkfBBBkLspW1h84VsJZFTsPPqMDA7g1Md7p0' )) @@ -442,7 +442,7 @@ def test_rsa_verify_should_return_true_for_test_vector(self): Reference: https://tools.ietf.org/html/rfc7520#section-4.1 """ - signing_input = ensure_bytes( + signing_input = force_bytes( 'eyJhbGciOiJSUzI1NiIsImtpZCI6ImJpbGJvLmJhZ2dpbnNAaG9iYml0b24uZXhhb' 'XBsZSJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ29pbmcgb' '3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIGlmIHlvdS' @@ -450,7 +450,7 @@ def test_rsa_verify_should_return_true_for_test_vector(self): 'geW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4' ) - signature = base64url_decode(ensure_bytes( + signature = base64url_decode(force_bytes( 'MRjdkly7_-oTPTS3AXP41iQIGKa80A0ZmTuV5MEaHoxnW2e5CZ5NlKtainoFmKZop' 'dHM1O2U4mwzJdQx996ivp83xuglII7PNDi84wnB-BDkoBwA78185hX-Es4JIwmDLJ' 'K3lfWRa-XtL0RnltuYv746iYTh_qHRD68BNt1uSNCrUCTJDt5aAE6x8wW1Kt9eRo4' @@ -473,7 +473,7 @@ def test_rsapss_verify_should_return_true_for_test_vector(self): Reference: https://tools.ietf.org/html/rfc7520#section-4.2 """ - signing_input = ensure_bytes( + signing_input = force_bytes( 'eyJhbGciOiJQUzM4NCIsImtpZCI6ImJpbGJvLmJhZ2dpbnNAaG9iYml0b24uZXhhb' 'XBsZSJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ29pbmcgb' '3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIGlmIHlvdS' @@ -481,7 +481,7 @@ def test_rsapss_verify_should_return_true_for_test_vector(self): 'geW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4' ) - signature = base64url_decode(ensure_bytes( + signature = base64url_decode(force_bytes( 'cu22eBqkYDKgIlTpzDXGvaFfz6WGoz7fUDcfT0kkOy42miAh2qyBzk1xEsnk2IpN6' '-tPid6VrklHkqsGqDqHCdP6O8TTB5dDDItllVo6_1OLPpcbUrhiUSMxbbXUvdvWXz' 'g-UD8biiReQFlfz28zGWVsdiNAUf8ZnyPEgVFn442ZdNqiVJRmBqrYRXe8P_ijQ7p' @@ -504,7 +504,7 @@ def test_ec_verify_should_return_true_for_test_vector(self): Reference: https://tools.ietf.org/html/rfc7520#section-4.3 """ - signing_input = ensure_bytes( + signing_input = force_bytes( 'eyJhbGciOiJFUzUxMiIsImtpZCI6ImJpbGJvLmJhZ2dpbnNAaG9iYml0b24uZXhhb' 'XBsZSJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ29pbmcgb' '3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIGlmIHlvdS' @@ -512,7 +512,7 @@ def test_ec_verify_should_return_true_for_test_vector(self): 'geW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4' ) - signature = base64url_decode(ensure_bytes( + signature = base64url_decode(force_bytes( 'AE_R_YZCChjn4791jSQCrdPZCNYqHXCTZH0-JZGYNlaAjP2kqaluUIIUnC9qvbu9P' 'lon7KRTzoNEuT4Va2cmL1eJAQy3mtPBu_u_sDDyYjnAMDxXPn7XrT0lw-kvAD890j' 'l8e2puQens_IEKBpHABlsbEPX6sFY8OcGDqoRuBomu9xQ2' diff --git a/tests/test_api_jws.py b/tests/test_api_jws.py index c56ec4b26..855575c31 100644 --- a/tests/test_api_jws.py +++ b/tests/test_api_jws.py @@ -8,12 +8,11 @@ from jwt.exceptions import ( DecodeError, InvalidAlgorithmError, InvalidTokenError ) -from jwt.utils import base64url_decode +from jwt.utils import base64url_decode, force_bytes, force_unicode import pytest from .compat import string_types, text_type -from .utils import ensure_bytes, ensure_unicode try: from cryptography.hazmat.backends import default_backend @@ -34,7 +33,7 @@ def jws(): @pytest.fixture def payload(): """ Creates a sample jws claimset for use as a payload during tests """ - return ensure_bytes('hello world') + return force_bytes('hello world') class TestJWS: @@ -211,7 +210,7 @@ def test_decodes_valid_es384_jws(self, jws): b'HAG0_zxxu0JyINOFT2iqF3URYl9HZ8kZWMeZAtXmn6Cw' b'PXRJD2f7N-f7bJ5JeL9VT5beI2XD3FlK3GgRvI-eE-2Ik') decoded_payload = jws.decode(example_jws, example_pubkey) - json_payload = json.loads(ensure_unicode(decoded_payload)) + json_payload = json.loads(force_unicode(decoded_payload)) assert json_payload == example_payload @@ -236,7 +235,7 @@ def test_decodes_valid_rs384_jws(self, jws): b'uwmrtSWCBUjiN8sqJ00CDgycxKqHfUndZbEAOjcCAhBr' b'qWW3mSVivUfubsYbwUdUG3fSRPjaUPcpe8A') decoded_payload = jws.decode(example_jws, example_pubkey) - json_payload = json.loads(ensure_unicode(decoded_payload)) + json_payload = json.loads(force_unicode(decoded_payload)) assert json_payload == example_payload @@ -410,12 +409,12 @@ def test_get_unverified_header_fails_on_bad_header_types(self, jws, payload): def test_encode_decode_with_rsa_sha256(self, jws, payload): # PEM-formatted RSA key with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file: - priv_rsakey = load_pem_private_key(ensure_bytes(rsa_priv_file.read()), + priv_rsakey = load_pem_private_key(force_bytes(rsa_priv_file.read()), password=None, backend=default_backend()) jws_message = jws.encode(payload, priv_rsakey, algorithm='RS256') with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file: - pub_rsakey = load_ssh_public_key(ensure_bytes(rsa_pub_file.read()), + pub_rsakey = load_ssh_public_key(force_bytes(rsa_pub_file.read()), backend=default_backend()) jws.decode(jws_message, pub_rsakey) @@ -433,12 +432,12 @@ def test_encode_decode_with_rsa_sha256(self, jws, payload): def test_encode_decode_with_rsa_sha384(self, jws, payload): # PEM-formatted RSA key with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file: - priv_rsakey = load_pem_private_key(ensure_bytes(rsa_priv_file.read()), + priv_rsakey = load_pem_private_key(force_bytes(rsa_priv_file.read()), password=None, backend=default_backend()) jws_message = jws.encode(payload, priv_rsakey, algorithm='RS384') with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file: - pub_rsakey = load_ssh_public_key(ensure_bytes(rsa_pub_file.read()), + pub_rsakey = load_ssh_public_key(force_bytes(rsa_pub_file.read()), backend=default_backend()) jws.decode(jws_message, pub_rsakey) @@ -455,12 +454,12 @@ def test_encode_decode_with_rsa_sha384(self, jws, payload): def test_encode_decode_with_rsa_sha512(self, jws, payload): # PEM-formatted RSA key with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file: - priv_rsakey = load_pem_private_key(ensure_bytes(rsa_priv_file.read()), + priv_rsakey = load_pem_private_key(force_bytes(rsa_priv_file.read()), password=None, backend=default_backend()) jws_message = jws.encode(payload, priv_rsakey, algorithm='RS512') with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file: - pub_rsakey = load_ssh_public_key(ensure_bytes(rsa_pub_file.read()), + pub_rsakey = load_ssh_public_key(force_bytes(rsa_pub_file.read()), backend=default_backend()) jws.decode(jws_message, pub_rsakey) @@ -497,12 +496,12 @@ def test_rsa_related_algorithms(self, jws): def test_encode_decode_with_ecdsa_sha256(self, jws, payload): # PEM-formatted EC key with open('tests/keys/testkey_ec', 'r') as ec_priv_file: - priv_eckey = load_pem_private_key(ensure_bytes(ec_priv_file.read()), + priv_eckey = load_pem_private_key(force_bytes(ec_priv_file.read()), password=None, backend=default_backend()) jws_message = jws.encode(payload, priv_eckey, algorithm='ES256') with open('tests/keys/testkey_ec.pub', 'r') as ec_pub_file: - pub_eckey = load_pem_public_key(ensure_bytes(ec_pub_file.read()), + pub_eckey = load_pem_public_key(force_bytes(ec_pub_file.read()), backend=default_backend()) jws.decode(jws_message, pub_eckey) @@ -520,12 +519,12 @@ def test_encode_decode_with_ecdsa_sha384(self, jws, payload): # PEM-formatted EC key with open('tests/keys/testkey_ec', 'r') as ec_priv_file: - priv_eckey = load_pem_private_key(ensure_bytes(ec_priv_file.read()), + priv_eckey = load_pem_private_key(force_bytes(ec_priv_file.read()), password=None, backend=default_backend()) jws_message = jws.encode(payload, priv_eckey, algorithm='ES384') with open('tests/keys/testkey_ec.pub', 'r') as ec_pub_file: - pub_eckey = load_pem_public_key(ensure_bytes(ec_pub_file.read()), + pub_eckey = load_pem_public_key(force_bytes(ec_pub_file.read()), backend=default_backend()) jws.decode(jws_message, pub_eckey) @@ -542,12 +541,12 @@ def test_encode_decode_with_ecdsa_sha384(self, jws, payload): def test_encode_decode_with_ecdsa_sha512(self, jws, payload): # PEM-formatted EC key with open('tests/keys/testkey_ec', 'r') as ec_priv_file: - priv_eckey = load_pem_private_key(ensure_bytes(ec_priv_file.read()), + priv_eckey = load_pem_private_key(force_bytes(ec_priv_file.read()), password=None, backend=default_backend()) jws_message = jws.encode(payload, priv_eckey, algorithm='ES512') with open('tests/keys/testkey_ec.pub', 'r') as ec_pub_file: - pub_eckey = load_pem_public_key(ensure_bytes(ec_pub_file.read()), backend=default_backend()) + pub_eckey = load_pem_public_key(force_bytes(ec_pub_file.read()), backend=default_backend()) jws.decode(jws_message, pub_eckey) # string-formatted key @@ -606,8 +605,8 @@ def default(self, o): token = jws.encode(payload, 'secret', headers=data, json_encoder=CustomJSONEncoder) - header = ensure_bytes(ensure_unicode(token).split('.')[0]) - header = json.loads(ensure_unicode(base64url_decode(header))) + header = force_bytes(force_unicode(token).split('.')[0]) + header = json.loads(force_unicode(base64url_decode(header))) assert 'some_decimal' in header assert header['some_decimal'] == 'it worked' diff --git a/tests/test_compat.py b/tests/test_compat.py index 6f6d6d0ef..10beb94e9 100644 --- a/tests/test_compat.py +++ b/tests/test_compat.py @@ -1,20 +1,19 @@ from jwt.compat import constant_time_compare - -from .utils import ensure_bytes +from jwt.utils import force_bytes class TestCompat: def test_constant_time_compare_returns_true_if_same(self): assert constant_time_compare( - ensure_bytes('abc'), ensure_bytes('abc') + force_bytes('abc'), force_bytes('abc') ) def test_constant_time_compare_returns_false_if_diff_lengths(self): assert not constant_time_compare( - ensure_bytes('abc'), ensure_bytes('abcd') + force_bytes('abc'), force_bytes('abcd') ) def test_constant_time_compare_returns_false_if_totally_different(self): assert not constant_time_compare( - ensure_bytes('abcd'), ensure_bytes('efgh') + force_bytes('abcd'), force_bytes('efgh') ) diff --git a/tests/utils.py b/tests/utils.py index 772270232..2e3f043b6 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -4,22 +4,6 @@ from calendar import timegm from datetime import datetime -from .compat import text_type - - -def ensure_bytes(key): - if isinstance(key, text_type): - key = key.encode('utf-8') - - return key - - -def ensure_unicode(key): - if not isinstance(key, text_type): - key = key.decode() - - return key - def utc_timestamp(): return timegm(datetime.utcnow().utctimetuple())