diff --git a/bspump/ldap/__init__.py b/bspump/ldap/__init__.py new file mode 100644 index 00000000..5cff3c7f --- /dev/null +++ b/bspump/ldap/__init__.py @@ -0,0 +1,7 @@ +from .connection import LDAPConnection +from .source import LDAPSource + +__all__ = ( + "LDAPConnection", + "LDAPSource", +) diff --git a/bspump/ldap/connection.py b/bspump/ldap/connection.py new file mode 100644 index 00000000..7ac5a129 --- /dev/null +++ b/bspump/ldap/connection.py @@ -0,0 +1,105 @@ +import contextlib +import logging +import ldap +import ldap.resiter + +from ..abc.connection import Connection + +# + +L = logging.getLogger(__name__) + +# + + +class LDAPObject(ldap.ldapobject.LDAPObject, ldap.resiter.ResultProcessor): + pass + + +class LDAPConnection(Connection): + """ + Examples of configurations: + + [connection:ldap] + host=12.34.56.78 + port=27017 + username=cn=admin,dc=example,dc=org + password=abc123def456 + + [connection:ldap] + uri=ldaps://localhost:636 + username=cn=admin,dc=example,dc=org + password=abc123def456 + tls_cafile=/conf/cert.ca + tls_require_cert=allow + """ + + ConfigDefaults = { + "host": "localhost", + "port": 0, # = use the default 389 for non-secure and 636 for secure connection + "uri": "", + "username": "cn=admin,dc=example,dc=org", + "password": "admin", + + # Path to CA file in PEM format + "tls_cafile": "", + + # Certificate policy. + # Possible options (from python-ldap docs): + # "never" - Don’t check server cert and host name + # "allow" - Used internally by slapd server. + # "demand" - Validate peer cert chain and host name + # "hard" - Same as "demand" + "tls_require_cert": "never", + + "network_timeout": "10", # set network_timeout to -1 for no timeout + } + + def __init__(self, app, id=None, config=None): + super().__init__(app, id=id, config=config) + self.TLSEnabled = len(self.Config.get("tls_cafile")) > 0 + self.URI = self.Config.get("uri") + self.Host = self.Config.get("host") + self.Port = self.Config.getint("port") + if self.Port == 0: + self.Port = 636 if self.TLSEnabled else 389 + if len(self.URI) == 0: + self.URI = "{scheme}://{host}:{port}".format( + scheme="ldaps" if self.TLSEnabled else "ldap", + host=self.Host, + port=self.Port + ) + + @contextlib.contextmanager + def ldap_client(self): + client = LDAPObject(self.URI) + client.protocol_version = ldap.VERSION3 + client.set_option(ldap.OPT_REFERRALS, 0) + client.set_option(ldap.OPT_NETWORK_TIMEOUT, int(self.Config.get("network_timeout"))) + if self.TLSEnabled: + self._enable_tls(client) + + try: + client.simple_bind_s(self.Config.get("username"), self.Config.get("password")) + yield client + except Exception as e: + L.error("Cannot connect to LDAP server: {}".format(e), exc_info=True, struct_data={"ldap_uri": self.URI}) + finally: + client.unbind_s() + + def _enable_tls(self, client): + tls_cafile = self.Config.get("tls_cafile") + tls_require_cert = self.Config.get("tls_require_cert") + if len(tls_cafile) > 0: + client.set_option(ldap.OPT_X_TLS_CACERTFILE, tls_cafile) + if tls_require_cert == "never": + client.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) + elif tls_require_cert == "demand": + client.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND) + elif tls_require_cert == "allow": + client.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_ALLOW) + elif tls_require_cert == "hard": + client.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_HARD) + else: + raise ValueError("Invalid 'tls_require_cert' value: {}.") + client.set_option(ldap.OPT_X_TLS_NEWCTX, 0) diff --git a/bspump/ldap/source.py b/bspump/ldap/source.py new file mode 100644 index 00000000..3c460cc0 --- /dev/null +++ b/bspump/ldap/source.py @@ -0,0 +1,90 @@ +import logging +import ldap +import ldap.controls + +from ..abc.source import TriggerSource + +# + +L = logging.getLogger(__name__) + +# + +class LDAPSource(TriggerSource): + + ConfigDefaults = { + "base": "dc=example,dc=org", + "filter": "(&(objectClass=inetOrgPerson)(cn=*))", + "attributes": "sAMAccountName cn createTimestamp modifyTimestamp UserAccountControl email", + "results_per_page": 1000, + "attribute_encoding": "utf-8", + } + + def __init__(self, app, pipeline, connection, id=None, config=None): + super().__init__(app, pipeline, id=id, config=config) + self.Connection = pipeline.locate_connection(app, connection) + self.ProactorService = app.get_service("asab.ProactorService") + self.Scope = ldap.SCOPE_SUBTREE + self.Base = self.Config.get("base") + self.Filter = self.Config.get("filter") + self.Attributes = self.Config.get("attributes").split(" ") + self.AttributeEncoding = self.Config.get("attribute_encoding") + self.ResultsPerPage = self.Config.getint("results_per_page") + + async def cycle(self): + # TODO: Throttling + await self.Pipeline.ready() + cookie = b"" + while True: + page, cookie = await self.ProactorService.execute( + self._search_worker, cookie) + for entry in page: + await self.process(entry, context={}) + if cookie is None or len(cookie) == 0: + break + + def _search_worker(self, cookie=b""): + page = [] + with self.Connection.ldap_client() as client: + paged_results_control = ldap.controls.SimplePagedResultsControl( + True, + size=self.ResultsPerPage, + cookie=cookie + ) + msgid = client.search_ext( + base=self.Base, + scope=self.Scope, + filterstr=self.Filter, + attrlist=self.Attributes, + serverctrls=[paged_results_control], + ) + res_type, res_data, res_msgid, serverctrls = client.result3(msgid) + for dn, attrs in res_data: + if dn is None: + # Skip system entries + continue + + event = {"dn": dn} + # LDAP returns all attributes as lists of bytestrings, e.g.: + # {"sAMAccountName": [b"vhavel"], ...} + # Unpack and decode them + for k, v in attrs.items(): + if isinstance(v, list): + if len(v) < 1: + continue + elif len(v) == 1: + v = v[0].decode(self.AttributeEncoding) + else: + v = [item.decode(self.AttributeEncoding) for item in v] + event[k] = v + page.append(event) + + for sc in serverctrls: + if sc.controlType == ldap.controls.SimplePagedResultsControl.controlType: + cookie = sc.cookie + break + else: + L.error("No SimplePagedResultsControl cookie in result serverctrls") + cookie = b"" + + return page, cookie diff --git a/examples/bspump-ldap-source.py b/examples/bspump-ldap-source.py new file mode 100755 index 00000000..6e43de03 --- /dev/null +++ b/examples/bspump-ldap-source.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 +import logging +import bspump +import bspump.common +import bspump.trigger +import bspump.ldap + +### + +L = logging.getLogger(__name__) + +### + + +class NormalizeProcessor(bspump.Processor): + def process(self, context, event): + if "sAMAccountName" in event: + event["username"] = event.pop("sAMAccountName") + if "userAccountControl" in event: + event["suspended"] = int(event.pop("userAccountControl")) & 2 == 2 + return event + + +class LDAPPipeline(bspump.Pipeline): + def __init__(self, app, pipeline_id): + super().__init__(app, pipeline_id) + self.build( + bspump.ldap.LDAPSource(app, self, "LDAPConnection").on(bspump.trigger.PubSubTrigger(app, "RunLDAPPipeline!")), + NormalizeProcessor(app, self), + bspump.common.PPrintSink(app, self) + ) + + +if __name__ == "__main__": + """ + This simple pipeline connects to LDAP, retrieves user data and prints them to stdout. + + Make sure to update the connection and source config with the credentials for your LDAP server. + """ + app = bspump.BSPumpApplication() + svc = app.get_service("bspump.PumpService") + + # Create LDAP connection + svc.add_connection( + bspump.ldap.LDAPConnection(app, "LDAPConnection") + ) + + # Construct and register the pipeline + pl = LDAPPipeline(app, "LDAPPipeline") + svc.add_pipeline(pl) + + # Trigger the pipeline + app.PubSub.publish("RunLDAPPipeline!", asynchronously=True) + + app.run() diff --git a/setup.py b/setup.py index 4ff8a177..51b96266 100644 --- a/setup.py +++ b/setup.py @@ -91,6 +91,9 @@ def run(self): 'xxhash>=1.4.4', 'orjson>=3.4.7', ], + extras_require={ + 'ldap': 'python-ldap', + }, scripts=[ 'utils/bselastic', 'utils/bskibana',