Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LDAP source #156

Merged
merged 10 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions bspump/ldap/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .connection import LDAPConnection
from .source import LDAPSource

__all__ = (
"LDAPConnection",
"LDAPSource",
)
105 changes: 105 additions & 0 deletions bspump/ldap/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import contextlib
import logging
import ldap
import ldap.resiter

from ..abc.connection import Connection

#

L = logging.getLogger(__name__)

#


class LDAPObject(ldap.ldapobject.LDAPObject, ldap.resiter.ResultProcessor):
pass


class LDAPConnection(Connection):
"""
Examples of configurations:

[connection:ldap]
host=12.34.56.78
port=27017
username=cn=admin,dc=example,dc=org
password=abc123def456

[connection:ldap]
uri=ldaps://localhost:636
username=cn=admin,dc=example,dc=org
password=abc123def456
tls_cafile=/conf/cert.ca
tls_require_cert=allow
"""

ConfigDefaults = {
"host": "localhost",
"port": 0, # = use the default 389 for non-secure and 636 for secure connection
"uri": "",
"username": "cn=admin,dc=example,dc=org",
"password": "admin",

# Path to CA file in PEM format
"tls_cafile": "",

# Certificate policy.
# Possible options (from python-ldap docs):
# "never" - Don’t check server cert and host name
# "allow" - Used internally by slapd server.
# "demand" - Validate peer cert chain and host name
# "hard" - Same as "demand"
"tls_require_cert": "never",

"network_timeout": "10", # set network_timeout to -1 for no timeout
}

def __init__(self, app, id=None, config=None):
super().__init__(app, id=id, config=config)
self.TLSEnabled = len(self.Config.get("tls_cafile")) > 0
self.URI = self.Config.get("uri")
self.Host = self.Config.get("host")
self.Port = self.Config.getint("port")
if self.Port == 0:
self.Port = 636 if self.TLSEnabled else 389
if len(self.URI) == 0:
self.URI = "{scheme}://{host}:{port}".format(
scheme="ldaps" if self.TLSEnabled else "ldap",
host=self.Host,
port=self.Port
)

@contextlib.contextmanager
def ldap_client(self):
client = LDAPObject(self.URI)
client.protocol_version = ldap.VERSION3
client.set_option(ldap.OPT_REFERRALS, 0)
client.set_option(ldap.OPT_NETWORK_TIMEOUT, int(self.Config.get("network_timeout")))
if self.TLSEnabled:
self._enable_tls(client)

try:
client.simple_bind_s(self.Config.get("username"), self.Config.get("password"))
yield client
except Exception as e:
L.error("Cannot connect to LDAP server: {}".format(e), exc_info=True, struct_data={"ldap_uri": self.URI})
finally:
client.unbind_s()

def _enable_tls(self, client):
tls_cafile = self.Config.get("tls_cafile")
tls_require_cert = self.Config.get("tls_require_cert")
if len(tls_cafile) > 0:
client.set_option(ldap.OPT_X_TLS_CACERTFILE, tls_cafile)
if tls_require_cert == "never":
client.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
elif tls_require_cert == "demand":
client.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
elif tls_require_cert == "allow":
client.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_ALLOW)
elif tls_require_cert == "hard":
client.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_HARD)
else:
raise ValueError("Invalid 'tls_require_cert' value: {}.")
client.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
90 changes: 90 additions & 0 deletions bspump/ldap/source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import logging
import ldap
import ldap.controls

from ..abc.source import TriggerSource

#

L = logging.getLogger(__name__)

#

class LDAPSource(TriggerSource):

ConfigDefaults = {
"base": "dc=example,dc=org",
"filter": "(&(objectClass=inetOrgPerson)(cn=*))",
"attributes": "sAMAccountName cn createTimestamp modifyTimestamp UserAccountControl email",
"results_per_page": 1000,
"attribute_encoding": "utf-8",
}

def __init__(self, app, pipeline, connection, id=None, config=None):
super().__init__(app, pipeline, id=id, config=config)
self.Connection = pipeline.locate_connection(app, connection)
self.ProactorService = app.get_service("asab.ProactorService")
self.Scope = ldap.SCOPE_SUBTREE
self.Base = self.Config.get("base")
self.Filter = self.Config.get("filter")
self.Attributes = self.Config.get("attributes").split(" ")
self.AttributeEncoding = self.Config.get("attribute_encoding")
self.ResultsPerPage = self.Config.getint("results_per_page")

async def cycle(self):
# TODO: Throttling
await self.Pipeline.ready()
cookie = b""
while True:
page, cookie = await self.ProactorService.execute(
self._search_worker, cookie)
for entry in page:
await self.process(entry, context={})
if cookie is None or len(cookie) == 0:
break

def _search_worker(self, cookie=b""):
page = []
with self.Connection.ldap_client() as client:
paged_results_control = ldap.controls.SimplePagedResultsControl(
True,
size=self.ResultsPerPage,
cookie=cookie
)
msgid = client.search_ext(
base=self.Base,
scope=self.Scope,
filterstr=self.Filter,
attrlist=self.Attributes,
serverctrls=[paged_results_control],
)
res_type, res_data, res_msgid, serverctrls = client.result3(msgid)
for dn, attrs in res_data:
if dn is None:
# Skip system entries
continue

event = {"dn": dn}
# LDAP returns all attributes as lists of bytestrings, e.g.:
# {"sAMAccountName": [b"vhavel"], ...}
# Unpack and decode them
for k, v in attrs.items():
if isinstance(v, list):
if len(v) < 1:
continue
elif len(v) == 1:
v = v[0].decode(self.AttributeEncoding)
else:
v = [item.decode(self.AttributeEncoding) for item in v]
event[k] = v
page.append(event)

for sc in serverctrls:
if sc.controlType == ldap.controls.SimplePagedResultsControl.controlType:
cookie = sc.cookie
break
else:
L.error("No SimplePagedResultsControl cookie in result serverctrls")
cookie = b""

return page, cookie
55 changes: 55 additions & 0 deletions examples/bspump-ldap-source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#!/usr/bin/env python3
import logging
import bspump
import bspump.common
import bspump.trigger
import bspump.ldap

###

L = logging.getLogger(__name__)

###


class NormalizeProcessor(bspump.Processor):
def process(self, context, event):
if "sAMAccountName" in event:
event["username"] = event.pop("sAMAccountName")
if "userAccountControl" in event:
event["suspended"] = int(event.pop("userAccountControl")) & 2 == 2
return event


class LDAPPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.ldap.LDAPSource(app, self, "LDAPConnection").on(bspump.trigger.PubSubTrigger(app, "RunLDAPPipeline!")),
NormalizeProcessor(app, self),
bspump.common.PPrintSink(app, self)
)


if __name__ == "__main__":
"""
This simple pipeline connects to LDAP, retrieves user data and prints them to stdout.

Make sure to update the connection and source config with the credentials for your LDAP server.
"""
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")

# Create LDAP connection
svc.add_connection(
bspump.ldap.LDAPConnection(app, "LDAPConnection")
)

# Construct and register the pipeline
pl = LDAPPipeline(app, "LDAPPipeline")
svc.add_pipeline(pl)

# Trigger the pipeline
app.PubSub.publish("RunLDAPPipeline!", asynchronously=True)

app.run()
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ def run(self):
'xxhash>=1.4.4',
'orjson>=3.4.7',
],
extras_require={
'ldap': 'python-ldap',
},
scripts=[
'utils/bselastic',
'utils/bskibana',
Expand Down