Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass correct algorithms argument to jwt.decode #514

Merged
merged 2 commits into from
Jan 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion social_core/backends/apple.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def decode_id_token(self, id_token):
id_token,
key=public_key,
audience=self.get_audience(),
algorithm='RS256',
algorithms=['RS256'],
)
except PyJWTError:
raise AuthFailed(self, 'Token validation failed')
Expand Down
16 changes: 4 additions & 12 deletions social_core/backends/azuread_b2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@
"""

import json
import six

from cryptography.hazmat.primitives import serialization
from jwt import DecodeError, ExpiredSignature, decode as jwt_decode
from jwt.utils import base64url_decode
from jwt import DecodeError, ExpiredSignature, decode as jwt_decode, get_unverified_header


try:
Expand Down Expand Up @@ -173,22 +171,16 @@ def user_data(self, access_token, *args, **kwargs):
response = kwargs.get('response')

id_token = response.get('id_token')
if six.PY2:
# str() to fix a bug in Python's base64
# https://stackoverflow.com/a/2230623/161278
id_token = str(id_token)

jwt_header_json = base64url_decode(id_token.split('.')[0])
jwt_header = json.loads(jwt_header_json.decode('ascii'))

# `kid` is short for key id
key = self.get_public_key(jwt_header['kid'])
kid = get_unverified_header(id_token)['kid']
key = self.get_public_key(kid)

try:
return jwt_decode(
id_token,
key=key,
algorithms=jwt_header['alg'],
algorithms=['RS256'],
audience=self.setting('KEY'),
leeway=self.setting('JWT_LEEWAY', default=0),
)
Expand Down
15 changes: 3 additions & 12 deletions social_core/backends/azuread_tenant.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import base64
import json

from cryptography.x509 import load_pem_x509_certificate
from cryptography.hazmat.backends import default_backend
from jwt import DecodeError, ExpiredSignature, decode as jwt_decode
from jwt import DecodeError, ExpiredSignature, decode as jwt_decode, get_unverified_header

from ..exceptions import AuthTokenError
from .azuread import AzureADOAuth2
Expand Down Expand Up @@ -95,14 +92,8 @@ def get_user_id(self, details, response):
def user_data(self, access_token, *args, **kwargs):
id_token = access_token

# decode the JWT header as JSON dict
jwt_header = json.loads(
base64.b64decode(id_token.split('.', 1)[0]).decode()
)

# get key id and algorithm
key_id = jwt_header['kid']
algorithm = jwt_header['alg']
key_id = get_unverified_header(id_token)['kid']

try:
# retrieve certificate for key_id
Expand All @@ -111,7 +102,7 @@ def user_data(self, access_token, *args, **kwargs):
return jwt_decode(
id_token,
key=certificate.public_key(),
algorithms=algorithm,
algorithms=['RS256'],
audience=self.setting('KEY')
)
except (DecodeError, ExpiredSignature) as error:
Expand Down
13 changes: 0 additions & 13 deletions social_core/backends/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,3 @@ def get_key_and_secret(self):
if key_secret == (None, None):
key_secret = ('anonymous', 'anonymous')
return key_secret


class GoogleOpenId(OpenIdAuth):
name = 'google'
URL = 'https://www.google.com/accounts/o8/id'

def get_user_id(self, details, response):
"""
Return user unique id provided by service. For google user email
is unique enought to flag a single user. Email comes from schema:
http://axschema.org/contact/email
"""
return details['email']
4 changes: 2 additions & 2 deletions social_core/backends/open_id_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class OpenIdConnectAuth(BaseOAuth2):
REVOKE_TOKEN_URL = ''
USERINFO_URL = ''
JWKS_URI = ''
JWT_ALGORITHMS = ['RS256']
JWT_DECODE_OPTIONS = dict()

def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -162,14 +163,13 @@ def validate_and_return_id_token(self, id_token, access_token):
if not key:
raise AuthTokenError(self, 'Signature verification failed')

alg = key['alg']
rsakey = jwk.construct(key)

try:
claims = jwt.decode(
id_token,
rsakey.to_pem().decode('utf-8'),
algorithms=[alg],
algorithms=self.JWT_ALGORITHMS,
audience=client_id,
issuer=self.id_token_issuer(),
access_token=access_token,
Expand Down
96 changes: 0 additions & 96 deletions social_core/tests/backends/test_google.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import datetime
import json
import unittest2

Expand All @@ -10,7 +9,6 @@

from ..models import User
from .oauth import OAuth1Test, OAuth2Test
from .open_id import OpenIdTest
from .open_id_connect import OpenIdConnectTestMixin


Expand Down Expand Up @@ -95,100 +93,6 @@ def test_with_anonymous_key_and_secret(self):
self.do_login()


JANRAIN_NONCE = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')


class GoogleOpenIdTest(OpenIdTest):
backend_path = 'social_core.backends.google.GoogleOpenId'
expected_username = 'FooBar'
discovery_body = ''.join([
'<?xml version="1.0" encoding="UTF-8"?>',
'<xrds:XRDS xmlns:xrds="xri://$xrds" xmlns="xri://$xrd*($v*2.0)">',
'<XRD>',
'<Service priority="0">',
'<Type>http://specs.openid.net/auth/2.0/signon</Type>',
'<Type>http://openid.net/srv/ax/1.0</Type>',
'<Type>http://specs.openid.net/extensions/ui/1.0/mode/popup</Type>',
'<Type>http://specs.openid.net/extensions/ui/1.0/icon</Type>',
'<Type>http://specs.openid.net/extensions/pape/1.0</Type>',
'<URI>https://www.google.com/accounts/o8/ud</URI>',
'</Service>',
'<Service priority="10">',
'<Type>http://specs.openid.net/auth/2.0/signon</Type>',
'<Type>http://openid.net/srv/ax/1.0</Type>',
'<Type>http://specs.openid.net/extensions/ui/1.0/mode/popup</Type>',
'<Type>http://specs.openid.net/extensions/ui/1.0/icon</Type>',
'<Type>http://specs.openid.net/extensions/pape/1.0</Type>',
'<URI>https://www.google.com/accounts/o8/ud?source=mail</URI>',
'</Service>',
'<Service priority="10">',
'<Type>http://specs.openid.net/auth/2.0/signon</Type>',
'<Type>http://openid.net/srv/ax/1.0</Type>',
'<Type>http://specs.openid.net/extensions/ui/1.0/mode/popup</Type>',
'<Type>http://specs.openid.net/extensions/ui/1.0/icon</Type>',
'<Type>http://specs.openid.net/extensions/pape/1.0</Type>',
'<URI>https://www.google.com/accounts/o8/ud?source=gmail.com</URI>',
'</Service>',
'<Service priority="10">',
'<Type>http://specs.openid.net/auth/2.0/signon</Type>',
'<Type>http://openid.net/srv/ax/1.0</Type>',
'<Type>http://specs.openid.net/extensions/ui/1.0/mode/popup</Type>',
'<Type>http://specs.openid.net/extensions/ui/1.0/icon</Type>',
'<Type>http://specs.openid.net/extensions/pape/1.0</Type>',
'<URI>',
'https://www.google.com/accounts/o8/ud?source=googlemail.com',
'</URI>',
'</Service>',
'<Service priority="10">',
'<Type>http://specs.openid.net/auth/2.0/signon</Type>',
'<Type>http://openid.net/srv/ax/1.0</Type>',
'<Type>http://specs.openid.net/extensions/ui/1.0/mode/popup</Type>',
'<Type>http://specs.openid.net/extensions/ui/1.0/icon</Type>',
'<Type>http://specs.openid.net/extensions/pape/1.0</Type>',
'<URI>https://www.google.com/accounts/o8/ud?source=profiles</URI>',
'</Service>',
'</XRD>',
'</xrds:XRDS>'
])
server_response = urlencode({
'janrain_nonce': JANRAIN_NONCE,
'openid.assoc_handle': 'assoc-handle',
'openid.claimed_id': 'https://www.google.com/accounts/o8/id?'
'id=some-google-id',
'openid.ext1.mode': 'fetch_response',
'openid.ext1.type.email': 'http://axschema.org/contact/email',
'openid.ext1.type.first_name': 'http://axschema.org/namePerson/first',
'openid.ext1.type.last_name': 'http://axschema.org/namePerson/last',
'openid.ext1.type.old_email': 'http://schema.openid.net/contact/email',
'openid.ext1.value.email': 'foo@bar.com',
'openid.ext1.value.first_name': 'Foo',
'openid.ext1.value.last_name': 'Bar',
'openid.ext1.value.old_email': 'foo@bar.com',
'openid.identity': 'https://www.google.com/accounts/o8/id?'
'id=some-google-id',
'openid.mode': 'id_res',
'openid.ns': 'http://specs.openid.net/auth/2.0',
'openid.ns.ext1': 'http://openid.net/srv/ax/1.0',
'openid.op_endpoint': 'https://www.google.com/accounts/o8/ud',
'openid.response_nonce': JANRAIN_NONCE + 'by95cT34vX7p9g',
'openid.return_to': 'http://myapp.com/complete/google/?'
'janrain_nonce=' + JANRAIN_NONCE,
'openid.sig': 'brT2kmu3eCzb1gQ1pbaXdnWioVM=',
'openid.signed': 'op_endpoint,claimed_id,identity,return_to,'
'response_nonce,assoc_handle,ns.ext1,ext1.mode,'
'ext1.type.old_email,ext1.value.old_email,'
'ext1.type.first_name,ext1.value.first_name,'
'ext1.type.last_name,ext1.value.last_name,'
'ext1.type.email,ext1.value.email'
})

def test_login(self):
self.do_login()

def test_partial_pipeline(self):
self.do_partial_pipeline()


class GoogleRevokeTokenTest(GoogleOAuth2Test):
def test_revoke_token(self):
self.strategy.set_settings({
Expand Down
2 changes: 1 addition & 1 deletion social_core/tests/backends/test_keycloak.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def _encode(
def _decode(
token,
key=_PUBLIC_KEY,
algorithms=_ALGORITHM,
algorithms=[_ALGORITHM],
audience=_KEY,
):
return jwt.decode(token, key=key, algorithms=algorithms, audience=audience)
Expand Down