From 12f554873f6fc45a8252f4cb92c8d3423d2512e6 Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Mon, 23 Mar 2015 17:05:07 +0100 Subject: [PATCH] Fix basic auth ofuscation and prefix (fixes #128) --- cliquet/authentication.py | 46 +++++++++++++++------------- cliquet/tests/test_authentication.py | 25 +++++++++++++-- 2 files changed, 47 insertions(+), 24 deletions(-) diff --git a/cliquet/authentication.py b/cliquet/authentication.py index a7484ff3..89d0dd16 100644 --- a/cliquet/authentication.py +++ b/cliquet/authentication.py @@ -12,37 +12,41 @@ from cliquet import logger -def check_credentials(username, password, request): +class BasicAuthAuthenticationPolicy(base_auth.BasicAuthAuthenticationPolicy): """Basic auth implementation. Allow any user with any credentials (e.g. there is no need to create an account). """ - settings = request.registry.settings - is_enabled = settings['cliquet.basic_auth_enabled'] - - if not is_enabled or not username: - return - - hmac_secret = settings['cliquet.userid_hmac_secret'].encode('utf-8') - credentials = '%s:%s' % (username, password) - userid = hmac.new(hmac_secret, - credentials.encode('utf-8'), - hashlib.sha256).hexdigest() - - # Log authentication context. - logger.bind(auth_type='Basic') - - return ["basicauth_%s" % userid] - - -class BasicAuthAuthenticationPolicy(base_auth.BasicAuthAuthenticationPolicy): def __init__(self, *args, **kwargs): - super(BasicAuthAuthenticationPolicy, self).__init__(check_credentials, + noop_check = lambda *a: [Authenticated] # NOQA + super(BasicAuthAuthenticationPolicy, self).__init__(noop_check, *args, **kwargs) + def unauthenticated_userid(self, request): + settings = request.registry.settings + is_enabled = settings['cliquet.basic_auth_enabled'] + if not is_enabled: + return + + credentials = self._get_credentials(request) + if credentials: + username, password = credentials + if not username: + return + + hmac_secret = settings['cliquet.userid_hmac_secret'] + credentials = '%s:%s' % credentials + userid = hmac.new(hmac_secret.encode('utf-8'), + credentials.encode('utf-8'), + hashlib.sha256).hexdigest() + + # Log authentication context. + logger.bind(auth_type='Basic') + return "basicauth_%s" % userid + class TokenVerificationCache(object): """Verification cache class as expected by PyFxa library. diff --git a/cliquet/tests/test_authentication.py b/cliquet/tests/test_authentication.py index 5c557c3a..49a45559 100644 --- a/cliquet/tests/test_authentication.py +++ b/cliquet/tests/test_authentication.py @@ -1,13 +1,14 @@ import base64 +import hashlib import time import mock from fxa import errors as fxa_errors -from cliquet.authentication import TokenVerificationCache +from cliquet import authentication from cliquet.cache import redis as redis_backend -from .support import BaseWebTest, unittest +from .support import BaseWebTest, unittest, DummyRequest class AuthenticationPoliciesTest(BaseWebTest, unittest.TestCase): @@ -60,7 +61,7 @@ def test_views_are_forbidden_if_oauth2_scope_mismatch(self): class TokenVerificationCacheTest(unittest.TestCase): def setUp(self): cache = redis_backend.Redis(max_connections=1) - self.cache = TokenVerificationCache(cache, 0.05) + self.cache = authentication.TokenVerificationCache(cache, 0.05) def test_set_adds_the_record(self): stored = 'toto' @@ -79,3 +80,21 @@ def test_set_expires_the_value(self): time.sleep(0.1) retrieved = self.cache.get('foobar') self.assertIsNone(retrieved) + + +class BasicAuthenticationPolicyTest(unittest.TestCase): + def setUp(self): + self.policy = authentication.BasicAuthAuthenticationPolicy() + self.request = DummyRequest() + self.request.headers['Authorization'] = 'Basic bWF0Og==' + self.request.registry.settings['cliquet.basic_auth_enabled'] = True + + def test_prefixes_users_with_basicauth(self): + user_id = self.policy.unauthenticated_userid(self.request) + self.assertTrue(user_id.startswith('basicauth_')) + + @mock.patch('cliquet.authentication.hmac.new') + def test_userid_is_hashed(self, mocked): + mocked.return_value = hashlib.sha224('hashed') + user_id = self.policy.unauthenticated_userid(self.request) + self.assertIn('fc04599e80aed4e56d3465', user_id)