diff --git a/README.rst b/README.rst index c182969..0ccd915 100644 --- a/README.rst +++ b/README.rst @@ -7,7 +7,7 @@ adaptation of https://github.com/requests/requests-ntlm. Usage ----- -``HttpNtlmAuth`` extends HTTPX ``Auth`` base calss, so usage is simple: +``HttpNtlmAuth`` extends HTTPX ``Auth`` base class, so usage is simple: .. code:: python @@ -38,8 +38,7 @@ Requirements ------------ - httpx_ -- ntlm-auth_ +- pyspnego_ .. _httpx: https://github.com/encode/httpx -.. _ntlm-auth: https://github.com/jborean93/ntlm-auth - +.. _pyspnego: https://github.com/jborean93/pyspnego diff --git a/httpx_ntlm/httpx_ntlm.py b/httpx_ntlm/httpx_ntlm.py index 3bd9097..503b92a 100644 --- a/httpx_ntlm/httpx_ntlm.py +++ b/httpx_ntlm/httpx_ntlm.py @@ -1,13 +1,14 @@ +import base64 import binascii import warnings from ssl import get_server_certificate, PEM_cert_to_DER_cert from typing import Generator +import spnego from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.exceptions import UnsupportedAlgorithm -from ntlm_auth import ntlm from httpx import Auth, Request, Response @@ -18,24 +19,13 @@ class UnknownSignatureAlgorithmOID(Warning): class HttpNtlmAuth(Auth): """ HTTP NTLM Authentication Handler for HTTPX. """ - def __init__(self, username, password, send_cbt=True, domain=None): + def __init__(self, username, password, send_cbt=True): """Create an authentication handler for NTLM over HTTP. :param str username: Username in 'domain\\username' format :param str password: Password :param bool send_cbt: Will send the channel bindings over a HTTPS channel (Default: True) - :param str domain: Domain, when no provided as prefix in username (Default: None) """ - if domain: - self.domain = domain - self.username = username - else: - try: - self.domain, self.username = username.split("\\", 1) - except ValueError: - self.username = username - self.domain = "" - if self.domain: - self.domain = self.domain.upper() + self.username = username self.password = password self.send_cbt = send_cbt @@ -45,7 +35,7 @@ def auth_from_header(header): """ Given a WWW-Authenticate or Proxy-Authenticate header, returns the authentication type to use. We prefer NTLM over Negotiate if the server - suppports it. + supports it. """ header = header.lower() or "" if "ntlm" in header: @@ -97,10 +87,10 @@ def auth_from_header(header): # request = request.copy() # ntlm returns the headers as a base64 encoded bytestring. Convert to # a string. - context = ntlm.Ntlm() - negotiate_message = context.create_negotiate_message(self.domain).decode( - "ascii" - ) + client = spnego.client(self.username, self.password, protocol="ntlm") + # Perform the first step of the NTLM authentication + negotiate_message = base64.b64encode(client.step()).decode("ascii") + request.headers[req_header] = f"{auth_type} {negotiate_message}" # A streaming response breaks authentication. # This can be fixed by not streaming this request, which is safe @@ -126,17 +116,11 @@ def auth_from_header(header): ).strip() # Parse the challenge in the ntlm context - context.parse_challenge_message(ntlm_header_value[len(auth_strip) :]) - - # build response - # Get the response based on the challenge message - authenticate_message = context.create_authenticate_message( - self.username, - self.password, - self.domain, - server_certificate_hash=server_certificate_hash, - ) - authenticate_message = authenticate_message.decode("ascii") + # Parse the challenge in the ntlm context and perform + # the second step of authentication + val = base64.b64decode(ntlm_header_value[len(auth_strip):].encode()) + authenticate_message = base64.b64encode(client.step(val)).decode("ascii") + auth = f"{auth_type} {authenticate_message}" request.headers[req_header] = auth yield request diff --git a/requirements.txt b/requirements.txt index 9f97169..60737c5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ httpx>=0.21.* -ntlm-auth==1.5.* +pyspnego==0.3.* cryptography==36.0.* flask pytest diff --git a/setup.py b/setup.py index c77493b..710fe82 100644 --- a/setup.py +++ b/setup.py @@ -5,11 +5,11 @@ setup( name="httpx_ntlm", - version="0.0.11", + version="1.0.0", packages=["httpx_ntlm"], install_requires=[ "httpx>=0.21.*", - "ntlm-auth==1.5.*", + "pyspnego==0.3.*", "cryptography==36.0.*", ], provides=["httpx_ntlm"], diff --git a/tests/unit/test_httpx_ntlm.py b/tests/unit/test_httpx_ntlm.py index 9e60ac4..5d611a4 100644 --- a/tests/unit/test_httpx_ntlm.py +++ b/tests/unit/test_httpx_ntlm.py @@ -1,4 +1,5 @@ import base64 +import hashlib import unittest import httpx import httpx_ntlm @@ -13,6 +14,7 @@ def setUp(self): self.test_server_username = "%s\\%s" % (domain, username) self.test_server_password = password self.auth_types = ["ntlm", "negotiate", "both"] + self.hash = hashlib.new('md4', password.encode('utf-16le')).hexdigest() def test_httpx_ntlm(self): for auth_type in self.auth_types: @@ -25,6 +27,19 @@ def test_httpx_ntlm(self): self.assertEqual(res.status_code, 200, msg="auth_type " + auth_type) + def test_requests_ntlm_hash(self): + # Test authenticating using an NTLM hash + for auth_type in self.auth_types: + res = httpx.get( + url=self.test_server_url + auth_type, + auth=httpx_ntlm.HttpNtlmAuth( + self.test_server_username, + "0" * 32 + ":" + self.hash + ) + ) + + self.assertEqual(res.status_code, 200, msg="auth_type " + auth_type) + def test_history_is_preserved(self): for auth_type in self.auth_types: res = httpx.get( @@ -36,47 +51,6 @@ def test_history_is_preserved(self): self.assertEqual(len(res.history), 2) - def test_username_parse_backslash(self): - test_user = "domain\\user" - expected_domain = "DOMAIN" - expected_user = "user" - - context = httpx_ntlm.HttpNtlmAuth(test_user, "pass") - - actual_domain = context.domain - actual_user = context.username - - assert actual_domain == expected_domain - assert actual_user == expected_user - - def test_username_parse_at(self): - test_user = "user@domain.com" - # UPN format should not be split, since "stuff after @" not always == domain - # (eg, email address with alt UPN suffix) - expected_domain = "" - expected_user = "user@domain.com" - - context = httpx_ntlm.HttpNtlmAuth(test_user, "pass") - - actual_domain = context.domain - actual_user = context.username - - assert actual_domain == expected_domain - assert actual_user == expected_user - - def test_username_parse_no_domain(self): - test_user = "user" - expected_domain = "" - expected_user = "user" - - context = httpx_ntlm.HttpNtlmAuth(test_user, "pass") - - actual_domain = context.domain - actual_user = context.username - - assert actual_domain == expected_domain - assert actual_user == expected_user - class TestCertificateHash(unittest.TestCase): def test_rsa_md5(self):