Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to PySPNEGO #126

Merged
merged 6 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
python -m tests.test_server &

python -m pytest \
--ignore=tests/functional/test_function.py \
--ignore=tests/functional/test_functional.py \
--ignore=tests/test_server.py \
--cov requests_ntlm \
--cov-report term-missing \
Expand Down
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ Requirements
------------

- requests_
- ntlm-auth_
- pyspnego_

.. _requests: https://github.com/kennethreitz/requests/
.. _ntlm-auth: https://github.com/jborean93/ntlm-auth
.. _ntlm-auth: https://github.com/jborean93/pyspnego/

Authors
-------
Expand Down
163 changes: 90 additions & 73 deletions requests_ntlm/requests_ntlm.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,38 @@
import binascii
import sys
import warnings
import base64
import typing as t

from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.exceptions import UnsupportedAlgorithm
from ntlm_auth import ntlm
from requests.auth import AuthBase
from requests.packages.urllib3.response import HTTPResponse
import spnego


class ShimSessionSecurity:
"""Shim used for backwards compatibility with ntlm-auth."""

def __init__(self, context: spnego.ContextProxy) -> None:
self._context = context

def wrap(self, message) -> t.Tuple[bytes, bytes]:
wrap_res = self._context.wrap(message, encrypt=True)
signature = wrap_res.data[:16]
data = wrap_res.data[16:]

return data, signature

def unwrap(self, message: bytes, signature: bytes) -> bytes:
data = signature + message
return self._context.unwrap(data).data

def get_signature(self, message: bytes) -> bytes:
return self._context.sign(message)

def verify_signature(self, message: bytes, signature: bytes) -> None:
self._context.verify(message, signature)


class HttpNtlmAuth(AuthBase):
Expand All @@ -26,18 +50,7 @@ def __init__(self, username, password, session=None, send_cbt=True):
:param str session: Unused. Kept for backwards-compatibility.
:param bool send_cbt: Will send the channel bindings over a HTTPS channel (Default: True)
"""
if ntlm is None:
raise Exception("NTLM libraries unavailable")

# parse the username
try:
self.domain, self.username = username.split('\\', 1)
except ValueError:
self.username = username
self.domain = ''

if self.domain:
self.domain = self.domain.upper()
self.username = username
self.password = password
self.send_cbt = send_cbt

Expand All @@ -46,18 +59,30 @@ def __init__(self, username, password, session=None, send_cbt=True):
# call requests_ntlm to encrypt and decrypt the messages sent after authentication
self.session_security = None

def retry_using_http_NTLM_auth(self, auth_header_field, auth_header,
response, auth_type, args):
def retry_using_http_NTLM_auth(
self,
auth_header_field,
auth_header,
response,
auth_type,
args,
):
# Get the certificate of the server if using HTTPS for CBT
server_certificate_hash = self._get_server_cert(response)
cbt = None
if server_certificate_hash:
cbt = spnego.channel_bindings.GssChannelBindings(
application_data=b"tls-server-end-point:" + server_certificate_hash
)

"""Attempt to authenticate using HTTP NTLM challenge/response."""
if auth_header in response.request.headers:
return response

content_length = int(
response.request.headers.get('Content-Length', '0'), base=10)
if hasattr(response.request.body, 'seek'):
response.request.headers.get("Content-Length", "0"), base=10
)
if hasattr(response.request.body, "seek"):
if content_length > 0:
response.request.body.seek(-content_length, 1)
else:
Expand All @@ -69,11 +94,16 @@ def retry_using_http_NTLM_auth(self, auth_header_field, auth_header,
response.raw.release_conn()
request = response.request.copy()

# ntlm returns the headers as a base64 encoded bytestring. Convert to
# a string.
context = ntlm.Ntlm()
negotiate_message = context.create_negotiate_message(self.domain).decode('ascii')
auth = u'%s %s' % (auth_type, negotiate_message)
client = spnego.client(
self.username,
self.password,
protocol="ntlm",
channel_bindings=cbt,
)
# Perform the first step of the NTLM authentication
negotiate_message = base64.b64encode(client.step()).decode()
auth = "%s %s" % (auth_type, negotiate_message)

request.headers[auth_header] = auth

# A streaming response breaks authentication.
Expand All @@ -96,73 +126,63 @@ def retry_using_http_NTLM_auth(self, auth_header_field, auth_header,
# this is important for some web applications that store
# authentication-related info in cookies (it took a long time to
# figure out)
if response2.headers.get('set-cookie'):
request.headers['Cookie'] = response2.headers.get('set-cookie')
if response2.headers.get("set-cookie"):
request.headers["Cookie"] = response2.headers.get("set-cookie")

# get the challenge
auth_header_value = response2.headers[auth_header_field]

auth_strip = auth_type + ' '
auth_strip = auth_type + " "

ntlm_header_value = next(
s for s in (val.lstrip() for val in auth_header_value.split(','))
s
for s in (val.lstrip() for val in auth_header_value.split(","))
if s.startswith(auth_strip)
).strip()

# Parse the challenge in the ntlm context
context.parse_challenge_message(ntlm_header_value[len(auth_strip):])
# Parse the challenge in the ntlm context and perform
# the second step of authentication
val = base64.b64decode(ntlm_header_value[len(auth_strip) :].encode())
authenticate_message = base64.b64encode(client.step(val)).decode()

# build response
# Get the response based on the challenge message
authenticate_message = context.create_authenticate_message(
self.username,
self.password,
self.domain,
server_certificate_hash=server_certificate_hash
)
authenticate_message = authenticate_message.decode('ascii')
auth = u'%s %s' % (auth_type, authenticate_message)
auth = "%s %s" % (auth_type, authenticate_message)
request.headers[auth_header] = auth

response3 = response2.connection.send(request, **args)

# Update the history.
response3.history.append(response)
response3.history.append(response2)

# Get the session_security object created by ntlm-auth for signing and sealing of messages
self.session_security = context.session_security
self.session_security = ShimSessionSecurity(client)

return response3

def response_hook(self, r, **kwargs):
"""The actual hook handler."""
if r.status_code == 401:
# Handle server auth.
www_authenticate = r.headers.get('www-authenticate', '').lower()
www_authenticate = r.headers.get("www-authenticate", "").lower()
auth_type = _auth_type_from_header(www_authenticate)

if auth_type is not None:
return self.retry_using_http_NTLM_auth(
'www-authenticate',
'Authorization',
"www-authenticate",
"Authorization",
r,
auth_type,
kwargs
kwargs,
)
elif r.status_code == 407:
# If we didn't have server auth, do proxy auth.
proxy_authenticate = r.headers.get(
'proxy-authenticate', ''
).lower()
proxy_authenticate = r.headers.get("proxy-authenticate", "").lower()
auth_type = _auth_type_from_header(proxy_authenticate)
if auth_type is not None:
return self.retry_using_http_NTLM_auth(
'proxy-authenticate',
'Proxy-authorization',
"proxy-authenticate",
"Proxy-authorization",
r,
auth_type,
kwargs
kwargs,
)

return r
Expand All @@ -179,36 +199,31 @@ def _get_server_cert(self, response):
:return: The hash of the DER encoded certificate at the request_url or None if not a HTTPS endpoint
"""
if self.send_cbt:
certificate_hash = None
raw_response = response.raw

if isinstance(raw_response, HTTPResponse):
if sys.version_info > (3, 0):
socket = raw_response._fp.fp.raw._sock
else:
socket = raw_response._fp.fp._sock
socket = raw_response._fp.fp.raw._sock

try:
server_certificate = socket.getpeercert(True)
except AttributeError:
pass
else:
certificate_hash = _get_certificate_hash(server_certificate)
return _get_certificate_hash(server_certificate)
else:
warnings.warn(
"Requests is running with a non urllib3 backend, cannot retrieve server certificate for CBT",
NoCertificateRetrievedWarning)
NoCertificateRetrievedWarning,
)

return certificate_hash
else:
return None
return None

def __call__(self, r):
# we must keep the connection because NTLM authenticates the
# connection, not single requests
r.headers["Connection"] = "Keep-Alive"

r.register_hook('response', self.response_hook)
r.register_hook("response", self.response_hook)
return r


Expand All @@ -218,10 +233,10 @@ def _auth_type_from_header(header):
authentication type to use. We prefer NTLM over Negotiate if the server
suppports it.
"""
if 'ntlm' in header:
return 'NTLM'
elif 'negotiate' in header:
return 'Negotiate'
if "ntlm" in header:
return "NTLM"
elif "negotiate" in header:
return "Negotiate"

return None

Expand All @@ -233,22 +248,24 @@ def _get_certificate_hash(certificate_der):
try:
hash_algorithm = cert.signature_hash_algorithm
except UnsupportedAlgorithm as ex:
warnings.warn("Failed to get signature algorithm from certificate, "
"unable to pass channel bindings: %s" % str(ex), UnknownSignatureAlgorithmOID)
warnings.warn(
"Failed to get signature algorithm from certificate, "
"unable to pass channel bindings: %s" % str(ex),
UnknownSignatureAlgorithmOID,
)
return None

# if the cert signature algorithm is either md5 or sha1 then use sha256
# otherwise use the signature algorithm
if hash_algorithm.name in ['md5', 'sha1']:
if hash_algorithm.name in ["md5", "sha1"]:
digest = hashes.Hash(hashes.SHA256(), default_backend())
else:
digest = hashes.Hash(hash_algorithm, default_backend())

digest.update(certificate_der)
certificate_hash_bytes = digest.finalize()
certificate_hash = binascii.hexlify(certificate_hash_bytes).decode().upper()

return certificate_hash
return certificate_hash_bytes


class NoCertificateRetrievedWarning(Warning):
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
requests>=2.0.0
ntlm-auth>=1.0.2
pyspnego
cryptography>=1.3
flask
pytest
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
name="requests_ntlm",
version="1.2.0",
packages=["requests_ntlm"],
install_requires=["requests>=2.0.0", "ntlm-auth>=1.0.2", "cryptography>=1.3"],
install_requires=["requests>=2.0.0", "pyspnego>=0.1.6", "cryptography>=1.3"],
python_requires=">=3.7",
provides=["requests_ntlm"],
author="Ben Toews",
Expand Down
3 changes: 3 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
username = 'username'
domain = 'domain'
password = 'password'

# Genearated online as hashlib.md4 may not be available anymore
password_md4 = '8a9d093f14f8701df17732b2bb182c74'
Loading