Skip to content

Commit

Permalink
Migrate from deprecated ntlm-auth to pyspnego
Browse files Browse the repository at this point in the history
Based on this[1] pull request by J. Hill-Daniel (clubby789) in the requests-ntlm sister library.

[1] requests#126
  • Loading branch information
caspervk committed Jan 17, 2022
1 parent 83931f8 commit 7305a5d
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 78 deletions.
7 changes: 3 additions & 4 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ adaptation of https://github.com/requests/requests-ntlm.
Usage
-----

``HttpNtlmAuth`` extends HTTPX ``Auth`` base calss, so usage is simple:
``HttpNtlmAuth`` extends HTTPX ``Auth`` base class, so usage is simple:

.. code:: python
Expand Down Expand Up @@ -38,8 +38,7 @@ Requirements
------------

- httpx_
- ntlm-auth_
- pyspnego_

.. _httpx: https://github.com/encode/httpx
.. _ntlm-auth: https://github.com/jborean93/ntlm-auth

.. _pyspnego: https://github.com/jborean93/pyspnego
44 changes: 14 additions & 30 deletions httpx_ntlm/httpx_ntlm.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import base64
import binascii
import warnings
from ssl import get_server_certificate, PEM_cert_to_DER_cert
from typing import Generator

import spnego
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.exceptions import UnsupportedAlgorithm
from ntlm_auth import ntlm
from httpx import Auth, Request, Response


Expand All @@ -18,24 +19,13 @@ class UnknownSignatureAlgorithmOID(Warning):
class HttpNtlmAuth(Auth):
""" HTTP NTLM Authentication Handler for HTTPX. """

def __init__(self, username, password, send_cbt=True, domain=None):
def __init__(self, username, password, send_cbt=True):
"""Create an authentication handler for NTLM over HTTP.
:param str username: Username in 'domain\\username' format
:param str password: Password
:param bool send_cbt: Will send the channel bindings over a HTTPS channel (Default: True)
:param str domain: Domain, when no provided as prefix in username (Default: None)
"""
if domain:
self.domain = domain
self.username = username
else:
try:
self.domain, self.username = username.split("\\", 1)
except ValueError:
self.username = username
self.domain = ""
if self.domain:
self.domain = self.domain.upper()
self.username = username
self.password = password
self.send_cbt = send_cbt

Expand All @@ -45,7 +35,7 @@ def auth_from_header(header):
"""
Given a WWW-Authenticate or Proxy-Authenticate header, returns the
authentication type to use. We prefer NTLM over Negotiate if the server
suppports it.
supports it.
"""
header = header.lower() or ""
if "ntlm" in header:
Expand Down Expand Up @@ -97,10 +87,10 @@ def auth_from_header(header):
# request = request.copy()
# ntlm returns the headers as a base64 encoded bytestring. Convert to
# a string.
context = ntlm.Ntlm()
negotiate_message = context.create_negotiate_message(self.domain).decode(
"ascii"
)
client = spnego.client(self.username, self.password, protocol="ntlm")
# Perform the first step of the NTLM authentication
negotiate_message = base64.b64encode(client.step()).decode("ascii")

request.headers[req_header] = f"{auth_type} {negotiate_message}"
# A streaming response breaks authentication.
# This can be fixed by not streaming this request, which is safe
Expand All @@ -126,17 +116,11 @@ def auth_from_header(header):
).strip()

# Parse the challenge in the ntlm context
context.parse_challenge_message(ntlm_header_value[len(auth_strip) :])

# build response
# Get the response based on the challenge message
authenticate_message = context.create_authenticate_message(
self.username,
self.password,
self.domain,
server_certificate_hash=server_certificate_hash,
)
authenticate_message = authenticate_message.decode("ascii")
# Parse the challenge in the ntlm context and perform
# the second step of authentication
val = base64.b64decode(ntlm_header_value[len(auth_strip):].encode())
authenticate_message = base64.b64encode(client.step(val)).decode("ascii")

auth = f"{auth_type} {authenticate_message}"
request.headers[req_header] = auth
yield request
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
httpx>=0.21.*
ntlm-auth==1.5.*
pyspnego==0.3.*
cryptography==36.0.*
flask
pytest
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@

setup(
name="httpx_ntlm",
version="0.0.11",
version="1.0.0",
packages=["httpx_ntlm"],
install_requires=[
"httpx>=0.21.*",
"ntlm-auth==1.5.*",
"pyspnego==0.3.*",
"cryptography==36.0.*",
],
provides=["httpx_ntlm"],
Expand Down
56 changes: 15 additions & 41 deletions tests/unit/test_httpx_ntlm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import base64
import hashlib
import unittest
import httpx
import httpx_ntlm
Expand All @@ -13,6 +14,7 @@ def setUp(self):
self.test_server_username = "%s\\%s" % (domain, username)
self.test_server_password = password
self.auth_types = ["ntlm", "negotiate", "both"]
self.hash = hashlib.new('md4', password.encode('utf-16le')).hexdigest()

def test_httpx_ntlm(self):
for auth_type in self.auth_types:
Expand All @@ -25,6 +27,19 @@ def test_httpx_ntlm(self):

self.assertEqual(res.status_code, 200, msg="auth_type " + auth_type)

def test_requests_ntlm_hash(self):
# Test authenticating using an NTLM hash
for auth_type in self.auth_types:
res = httpx.get(
url=self.test_server_url + auth_type,
auth=httpx_ntlm.HttpNtlmAuth(
self.test_server_username,
"0" * 32 + ":" + self.hash
)
)

self.assertEqual(res.status_code, 200, msg="auth_type " + auth_type)

def test_history_is_preserved(self):
for auth_type in self.auth_types:
res = httpx.get(
Expand All @@ -36,47 +51,6 @@ def test_history_is_preserved(self):

self.assertEqual(len(res.history), 2)

def test_username_parse_backslash(self):
test_user = "domain\\user"
expected_domain = "DOMAIN"
expected_user = "user"

context = httpx_ntlm.HttpNtlmAuth(test_user, "pass")

actual_domain = context.domain
actual_user = context.username

assert actual_domain == expected_domain
assert actual_user == expected_user

def test_username_parse_at(self):
test_user = "user@domain.com"
# UPN format should not be split, since "stuff after @" not always == domain
# (eg, email address with alt UPN suffix)
expected_domain = ""
expected_user = "user@domain.com"

context = httpx_ntlm.HttpNtlmAuth(test_user, "pass")

actual_domain = context.domain
actual_user = context.username

assert actual_domain == expected_domain
assert actual_user == expected_user

def test_username_parse_no_domain(self):
test_user = "user"
expected_domain = ""
expected_user = "user"

context = httpx_ntlm.HttpNtlmAuth(test_user, "pass")

actual_domain = context.domain
actual_user = context.username

assert actual_domain == expected_domain
assert actual_user == expected_user


class TestCertificateHash(unittest.TestCase):
def test_rsa_md5(self):
Expand Down

0 comments on commit 7305a5d

Please sign in to comment.