From 5c50674c1a4f84397fe75747f3abbeeb3dd893ce Mon Sep 17 00:00:00 2001 From: "robin.hruska@teskalabs.com" Date: Fri, 9 Dec 2022 13:19:56 +0100 Subject: [PATCH 01/10] Add python-ldap to setup extras --- setup.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/setup.py b/setup.py index 4ff8a177..51b96266 100644 --- a/setup.py +++ b/setup.py @@ -91,6 +91,9 @@ def run(self): 'xxhash>=1.4.4', 'orjson>=3.4.7', ], + extras_require={ + 'ldap': 'python-ldap', + }, scripts=[ 'utils/bselastic', 'utils/bskibana', From 9ce80bbf8d3c9657b56c35d48c977adfbcc51ce7 Mon Sep 17 00:00:00 2001 From: "robin.hruska@teskalabs.com" Date: Fri, 9 Dec 2022 18:52:42 +0100 Subject: [PATCH 02/10] Draft LDAP source --- bspump/ldap/__init__.py | 7 +++ bspump/ldap/connection.py | 101 ++++++++++++++++++++++++++++++++++++++ bspump/ldap/source.py | 64 ++++++++++++++++++++++++ 3 files changed, 172 insertions(+) create mode 100644 bspump/ldap/__init__.py create mode 100644 bspump/ldap/connection.py create mode 100644 bspump/ldap/source.py diff --git a/bspump/ldap/__init__.py b/bspump/ldap/__init__.py new file mode 100644 index 00000000..5cff3c7f --- /dev/null +++ b/bspump/ldap/__init__.py @@ -0,0 +1,7 @@ +from .connection import LDAPConnection +from .source import LDAPSource + +__all__ = ( + "LDAPConnection", + "LDAPSource", +) diff --git a/bspump/ldap/connection.py b/bspump/ldap/connection.py new file mode 100644 index 00000000..1cbdfcf8 --- /dev/null +++ b/bspump/ldap/connection.py @@ -0,0 +1,101 @@ +import logging +import ldap +import ldap.resiter + +from ..abc.connection import Connection + +# + +L = logging.getLogger(__name__) + +# + + +class LDAPObject(ldap.ldapobject.LDAPObject, ldap.resiter.ResultProcessor): + pass + + +class LDAPConnection(Connection): + """ + Examples of configurations: + + [connection:ldap] + host=12.34.56.78 + port=27017 + username=cn=admin,dc=example,dc=org + password=abc123def456 + + [connection:ldap] + uri=ldaps://localhost:636 + username=cn=admin,dc=example,dc=org + password=abc123def456 + """ + + ConfigDefaults = { + "host": "localhost", + "port": 0, # = use the default 389 for non-secure and 636 for secure connection + "uri": "", + "username": "cn=admin,dc=example,dc=org", + "password": "admin", + "base": "dc=example,dc=org", + + # Path to CA file in PEM format + "tls_cafile": "", + + # Certificate policy. + # Possible options (from python-ldap docs): + # "never" - Don’t check server cert and host name + # "allow" - Used internally by slapd server. + # "demand" - Validate peer cert chain and host name + # "hard" - Same as "demand" + "tls_require_cert": "never", + + "network_timeout": "10", # set network_timeout to -1 for no timeout + } + + def __init__(self, app, id=None, config=None): + super().__init__(app, id=id, config=config) + self.TLSEnabled = len(self.Config.get("tls_cafile")) > 0 + self.URI = self.Config.get("uri") + self.Host = self.Config.get("host") + self.Port = self.Config.get("port") + if self.Port == 0: + self.Port = 636 if self.TLSEnabled else 389 + if len(self.LdapUri) == 0: + self.URI = "{scheme}://{host}:{port}".format( + scheme="ldaps" if self.TLSEnabled else "ldap", + host=self.Host, + port=self.Port + ) + + @contextlib.contextmanager + def ldap_client(self): + client = LDAPObject(self.LdapUri) + client.protocol_version = ldap.VERSION3 + client.set_option(ldap.OPT_REFERRALS, 0) + client.set_option(ldap.OPT_NETWORK_TIMEOUT, int(self.Config.get("network_timeout"))) + if self.TLSEnabled: + self._enable_tls(client) + + client.simple_bind_s(self.Config.get("username"), self.Config.get("password")) + try: + yield client + finally: + client.unbind_s() + + def _enable_tls(self, client): + tls_cafile = self.Config.get("tls_cafile") + tls_require_cert = self.Config.get("tls_require_cert") + if len(tls_cafile) > 0: + client.set_option(ldap.OPT_X_TLS_CACERTFILE, tls_cafile) + if tls_require_cert == "never": + client.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) + elif tls_require_cert == "demand": + client.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND) + elif tls_require_cert == "allow": + client.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_ALLOW) + elif tls_require_cert == "hard": + client.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_HARD) + else: + raise ValueError("Invalid 'tls_require_cert' value: {}.") + client.set_option(ldap.OPT_X_TLS_NEWCTX, 0) diff --git a/bspump/ldap/source.py b/bspump/ldap/source.py new file mode 100644 index 00000000..5f8f4a41 --- /dev/null +++ b/bspump/ldap/source.py @@ -0,0 +1,64 @@ +import asyncio +import logging +from bspump.abc.source import TriggerSource + +# + +L = logging.getLogger(__name__) + +# + +class LDAPSource(TriggerSource): + + ConfigDefaults = { + "filter": "(&(objectClass=inetOrgPerson)(cn=*))", + "attributes": "sAMAccountName cn createTimestamp modifyTimestamp UserAccountControl email", + "_results_per_page": 1000, + } + + def __init__(self, app, pipeline, connection, query_parms=None, id=None, config=None): + super().__init__(app, pipeline, id=id, config=config) + self.Scope = ldap.SCOPE_SUBTREE + self.Connection = pipeline.locate_connection(app, connection) + self.Base = self.Config.get("base") + self.Filter = self.Config.get("filter") + self.Attributes = self.Config.get("attributes").split(" ") + self.ResultsPerPage = self.Config.getint("_results_per_page") + + async def cycle(self): + # TODO: Throttling + await self.Pipeline.ready() + while True: + page, cookie = await self.ProactorService.execute( + self._search_worker, cookie) + async for entry in page: + await self.process(entry, context={}) + if cookie is None or len(cookie) == 0: + break + + def _search_worker(self, cookie=b""): + page = [] + with self.Connection.ldap_client() as client: + paged_results_control = ldap.controls.SimplePagedResultsControl( + True, + size=self.ResultsPerPage, + cookie=cookie + ) + msgid = client.search_ext( + base=self.Base, + scope=self.Scope, + filterstr=self.Filter, + attrlist=self.Attributes, + serverctrls=[paged_results_control], + ) + res_type, res_data, res_msgid, serverctrls = client.result3(msgid) + for dn, attrs in res_data: + page.append({"dn": dn, **attrs}) + + for sc in serverctrls: + if sc.controlType == ldap.controls.SimplePagedResultsControl.controlType: + cookie = sc.cookie + else: + L.error("Server ignores RFC 2696 control: No serverctrls in result") + + return page, cookie From 766f44aa142c33472a711adcae92ea806a05daaf Mon Sep 17 00:00:00 2001 From: "robin.hruska@teskalabs.com" Date: Mon, 12 Dec 2022 10:23:13 +0100 Subject: [PATCH 03/10] LDAP: Debugging --- bspump/ldap/connection.py | 7 ++++--- bspump/ldap/source.py | 14 +++++++++----- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/bspump/ldap/connection.py b/bspump/ldap/connection.py index 1cbdfcf8..18ba4413 100644 --- a/bspump/ldap/connection.py +++ b/bspump/ldap/connection.py @@ -1,3 +1,4 @@ +import contextlib import logging import ldap import ldap.resiter @@ -58,10 +59,10 @@ def __init__(self, app, id=None, config=None): self.TLSEnabled = len(self.Config.get("tls_cafile")) > 0 self.URI = self.Config.get("uri") self.Host = self.Config.get("host") - self.Port = self.Config.get("port") + self.Port = self.Config.getint("port") if self.Port == 0: self.Port = 636 if self.TLSEnabled else 389 - if len(self.LdapUri) == 0: + if len(self.URI) == 0: self.URI = "{scheme}://{host}:{port}".format( scheme="ldaps" if self.TLSEnabled else "ldap", host=self.Host, @@ -70,7 +71,7 @@ def __init__(self, app, id=None, config=None): @contextlib.contextmanager def ldap_client(self): - client = LDAPObject(self.LdapUri) + client = LDAPObject(self.URI) client.protocol_version = ldap.VERSION3 client.set_option(ldap.OPT_REFERRALS, 0) client.set_option(ldap.OPT_NETWORK_TIMEOUT, int(self.Config.get("network_timeout"))) diff --git a/bspump/ldap/source.py b/bspump/ldap/source.py index 5f8f4a41..c87eba1d 100644 --- a/bspump/ldap/source.py +++ b/bspump/ldap/source.py @@ -1,6 +1,8 @@ -import asyncio import logging -from bspump.abc.source import TriggerSource +import ldap +import ldap.controls + +from ..abc.source import TriggerSource # @@ -13,21 +15,23 @@ class LDAPSource(TriggerSource): ConfigDefaults = { "filter": "(&(objectClass=inetOrgPerson)(cn=*))", "attributes": "sAMAccountName cn createTimestamp modifyTimestamp UserAccountControl email", - "_results_per_page": 1000, + "results_per_page": 1000, } def __init__(self, app, pipeline, connection, query_parms=None, id=None, config=None): super().__init__(app, pipeline, id=id, config=config) - self.Scope = ldap.SCOPE_SUBTREE self.Connection = pipeline.locate_connection(app, connection) + self.ProactorService = app.get_service("asab.ProactorService") + self.Scope = ldap.SCOPE_SUBTREE self.Base = self.Config.get("base") self.Filter = self.Config.get("filter") self.Attributes = self.Config.get("attributes").split(" ") - self.ResultsPerPage = self.Config.getint("_results_per_page") + self.ResultsPerPage = self.Config.getint("results_per_page") async def cycle(self): # TODO: Throttling await self.Pipeline.ready() + cookie = b"" while True: page, cookie = await self.ProactorService.execute( self._search_worker, cookie) From 6a4b949011c73e6b3d0816b3740e66424f07611d Mon Sep 17 00:00:00 2001 From: "robin.hruska@teskalabs.com" Date: Mon, 12 Dec 2022 10:56:45 +0100 Subject: [PATCH 04/10] LDAP: Debugging --- bspump/ldap/source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bspump/ldap/source.py b/bspump/ldap/source.py index c87eba1d..99a2eff8 100644 --- a/bspump/ldap/source.py +++ b/bspump/ldap/source.py @@ -18,7 +18,7 @@ class LDAPSource(TriggerSource): "results_per_page": 1000, } - def __init__(self, app, pipeline, connection, query_parms=None, id=None, config=None): + def __init__(self, app, pipeline, connection, id=None, config=None): super().__init__(app, pipeline, id=id, config=config) self.Connection = pipeline.locate_connection(app, connection) self.ProactorService = app.get_service("asab.ProactorService") From 13f7563b1ac91df98989d50136698c27ef8e7581 Mon Sep 17 00:00:00 2001 From: "robin.hruska@teskalabs.com" Date: Mon, 12 Dec 2022 10:56:51 +0100 Subject: [PATCH 05/10] LDAP: Add example --- examples/bspump-ldap-source.py | 65 ++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100755 examples/bspump-ldap-source.py diff --git a/examples/bspump-ldap-source.py b/examples/bspump-ldap-source.py new file mode 100755 index 00000000..63416074 --- /dev/null +++ b/examples/bspump-ldap-source.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 +import logging + +import bspump +import bspump.common +import bspump.ldap +import bspump.trigger + +### + +L = logging.getLogger(__name__) + +### + + +class NormalizeProcessor(bspump.Processor): + def process(self, context, event): + if "sAMAccountName" in event: + event["username"] = event.pop("sAMAccountName").decode("utf8") + if "UserAccountControl" in event: + event["suspended"] = int(event.pop("UserAccountControl")) & 2 == 2 + return event + + +class LDAPPipeline(bspump.Pipeline): + def __init__(self, app, pipeline_id): + super().__init__(app, pipeline_id) + self.Sink = bspump.common.PPrintSink(app, self) + self.build( + bspump.ldap.LDAPSource(app, self, "LDAPConnection", config={ + "filter": "(&(objectClass=inetOrgPerson)(cn=*))", + "attributes": "sAMAccountName UserAccountControl email" + }).on(bspump.trigger.PubSubTrigger(app, "RunLDAPPipeline!")), + NormalizeProcessor(app, self), + self.Sink + ) + + +if __name__ == "__main__": + """ + This simple pipeline connects to LDAP, retrieves user data and prints them to stdout. + + Make sure to update the connection and source config with the credentials for your LDAP server. + """ + app = bspump.BSPumpApplication() + svc = app.get_service("bspump.PumpService") + + # Create LDAP connection + svc.add_connection( + bspump.ldap.LDAPConnection(app, "LDAPConnection", config={ + "host": "localhost", + "username": "cn=admin,dc=example,dc=org", + "password": "adminpassword", + "base": "dc=example,dc=org", + }) + ) + + # Construct and register the pipeline + pl = LDAPPipeline(app, "LDAPPipeline") + svc.add_pipeline(pl) + + # Trigger the pipeline + app.PubSub.publish("RunLDAPPipeline!", asynchronously=True) + + app.run() From 18646f7e14efe6191c9af024bc074a8188fc55fa Mon Sep 17 00:00:00 2001 From: "robin.hruska@teskalabs.com" Date: Wed, 4 Jan 2023 11:43:57 +0100 Subject: [PATCH 06/10] LDAP: Debugging --- bspump/ldap/connection.py | 5 +++-- bspump/ldap/source.py | 1 + examples/bspump-ldap-source.py | 24 +++++++----------------- 3 files changed, 11 insertions(+), 19 deletions(-) diff --git a/bspump/ldap/connection.py b/bspump/ldap/connection.py index 18ba4413..ddbb0461 100644 --- a/bspump/ldap/connection.py +++ b/bspump/ldap/connection.py @@ -38,7 +38,6 @@ class LDAPConnection(Connection): "uri": "", "username": "cn=admin,dc=example,dc=org", "password": "admin", - "base": "dc=example,dc=org", # Path to CA file in PEM format "tls_cafile": "", @@ -78,9 +77,11 @@ def ldap_client(self): if self.TLSEnabled: self._enable_tls(client) - client.simple_bind_s(self.Config.get("username"), self.Config.get("password")) try: + client.simple_bind_s(self.Config.get("username"), self.Config.get("password")) yield client + except Exception as e: + L.error("Cannot connect to LDAP server: {}".format(e), exc_info=True, struct_data={"ldap_uri": self.URI}) finally: client.unbind_s() diff --git a/bspump/ldap/source.py b/bspump/ldap/source.py index 99a2eff8..3dd246c2 100644 --- a/bspump/ldap/source.py +++ b/bspump/ldap/source.py @@ -13,6 +13,7 @@ class LDAPSource(TriggerSource): ConfigDefaults = { + "base": "dc=example,dc=org", "filter": "(&(objectClass=inetOrgPerson)(cn=*))", "attributes": "sAMAccountName cn createTimestamp modifyTimestamp UserAccountControl email", "results_per_page": 1000, diff --git a/examples/bspump-ldap-source.py b/examples/bspump-ldap-source.py index 63416074..bdcb7c97 100755 --- a/examples/bspump-ldap-source.py +++ b/examples/bspump-ldap-source.py @@ -1,10 +1,9 @@ #!/usr/bin/env python3 import logging - import bspump import bspump.common -import bspump.ldap import bspump.trigger +import bspump.ldap ### @@ -16,23 +15,19 @@ class NormalizeProcessor(bspump.Processor): def process(self, context, event): if "sAMAccountName" in event: - event["username"] = event.pop("sAMAccountName").decode("utf8") - if "UserAccountControl" in event: - event["suspended"] = int(event.pop("UserAccountControl")) & 2 == 2 + event["username"] = event.pop("sAMAccountName").pop().decode("utf8") + if "userAccountControl" in event: + event["suspended"] = int(event.pop("userAccountControl").pop()) & 2 == 2 return event class LDAPPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) - self.Sink = bspump.common.PPrintSink(app, self) self.build( - bspump.ldap.LDAPSource(app, self, "LDAPConnection", config={ - "filter": "(&(objectClass=inetOrgPerson)(cn=*))", - "attributes": "sAMAccountName UserAccountControl email" - }).on(bspump.trigger.PubSubTrigger(app, "RunLDAPPipeline!")), + bspump.ldap.LDAPSource(app, self, "LDAPConnection").on(bspump.trigger.PubSubTrigger(app, "RunLDAPPipeline!")), NormalizeProcessor(app, self), - self.Sink + bspump.common.PPrintSink(app, self) ) @@ -47,12 +42,7 @@ def __init__(self, app, pipeline_id): # Create LDAP connection svc.add_connection( - bspump.ldap.LDAPConnection(app, "LDAPConnection", config={ - "host": "localhost", - "username": "cn=admin,dc=example,dc=org", - "password": "adminpassword", - "base": "dc=example,dc=org", - }) + bspump.ldap.LDAPConnection(app, "LDAPConnection") ) # Construct and register the pipeline From 32da4efd6a905463fdbbd67094ec5173c3f80572 Mon Sep 17 00:00:00 2001 From: "robin.hruska@teskalabs.com" Date: Wed, 4 Jan 2023 14:07:04 +0100 Subject: [PATCH 07/10] LDAP source: Normalize output events --- bspump/ldap/source.py | 23 ++++++++++++++++++++++- examples/bspump-ldap-source.py | 4 ++-- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/bspump/ldap/source.py b/bspump/ldap/source.py index 3dd246c2..d6e4403b 100644 --- a/bspump/ldap/source.py +++ b/bspump/ldap/source.py @@ -17,6 +17,7 @@ class LDAPSource(TriggerSource): "filter": "(&(objectClass=inetOrgPerson)(cn=*))", "attributes": "sAMAccountName cn createTimestamp modifyTimestamp UserAccountControl email", "results_per_page": 1000, + "attribute_encoding": "utf-8", } def __init__(self, app, pipeline, connection, id=None, config=None): @@ -27,6 +28,7 @@ def __init__(self, app, pipeline, connection, id=None, config=None): self.Base = self.Config.get("base") self.Filter = self.Config.get("filter") self.Attributes = self.Config.get("attributes").split(" ") + self.AttributeEncoding = self.Config.get("attribute_encoding") self.ResultsPerPage = self.Config.getint("results_per_page") async def cycle(self): @@ -58,12 +60,31 @@ def _search_worker(self, cookie=b""): ) res_type, res_data, res_msgid, serverctrls = client.result3(msgid) for dn, attrs in res_data: - page.append({"dn": dn, **attrs}) + if dn is None: + # Skip system entries + continue + + event = {"dn": dn} + # LDAP returns all attributes as lists of bytestrings, e.g.: + # {"sAMAccountName": [b"vhavel"], ...} + # Unpack and decode them + for k, v in attrs.items: + if isinstance(v, list): + if len(v) < 1: + continue + elif len(v) == 1: + v = v[0].decode(self.AttributeEncoding) + else: + v = [item.decode(self.AttributeEncoding) for item in v] + event[k] = v + page.append(event) for sc in serverctrls: if sc.controlType == ldap.controls.SimplePagedResultsControl.controlType: cookie = sc.cookie + break else: L.error("Server ignores RFC 2696 control: No serverctrls in result") + cookie = b"" return page, cookie diff --git a/examples/bspump-ldap-source.py b/examples/bspump-ldap-source.py index bdcb7c97..6e43de03 100755 --- a/examples/bspump-ldap-source.py +++ b/examples/bspump-ldap-source.py @@ -15,9 +15,9 @@ class NormalizeProcessor(bspump.Processor): def process(self, context, event): if "sAMAccountName" in event: - event["username"] = event.pop("sAMAccountName").pop().decode("utf8") + event["username"] = event.pop("sAMAccountName") if "userAccountControl" in event: - event["suspended"] = int(event.pop("userAccountControl").pop()) & 2 == 2 + event["suspended"] = int(event.pop("userAccountControl")) & 2 == 2 return event From 3f89fb3106af6f2d639f2ae93844ee94554ce676 Mon Sep 17 00:00:00 2001 From: "robin.hruska@teskalabs.com" Date: Wed, 4 Jan 2023 15:46:14 +0100 Subject: [PATCH 08/10] LDAP source: Debugging --- bspump/ldap/source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bspump/ldap/source.py b/bspump/ldap/source.py index d6e4403b..b510132a 100644 --- a/bspump/ldap/source.py +++ b/bspump/ldap/source.py @@ -68,7 +68,7 @@ def _search_worker(self, cookie=b""): # LDAP returns all attributes as lists of bytestrings, e.g.: # {"sAMAccountName": [b"vhavel"], ...} # Unpack and decode them - for k, v in attrs.items: + for k, v in attrs.items(): if isinstance(v, list): if len(v) < 1: continue From 5ba66483ff63d2f6f0144466c5e9c6f37afa3f09 Mon Sep 17 00:00:00 2001 From: "robin.hruska@teskalabs.com" Date: Wed, 4 Jan 2023 15:49:37 +0100 Subject: [PATCH 09/10] LDAP source: Debugging --- bspump/ldap/source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bspump/ldap/source.py b/bspump/ldap/source.py index b510132a..b0808f41 100644 --- a/bspump/ldap/source.py +++ b/bspump/ldap/source.py @@ -38,7 +38,7 @@ async def cycle(self): while True: page, cookie = await self.ProactorService.execute( self._search_worker, cookie) - async for entry in page: + for entry in page: await self.process(entry, context={}) if cookie is None or len(cookie) == 0: break From e4438e51bb1fbc604c7f28b4927a91debf15d9d4 Mon Sep 17 00:00:00 2001 From: "robin.hruska@teskalabs.com" Date: Thu, 5 Jan 2023 12:40:32 +0100 Subject: [PATCH 10/10] LDAP source: Docstring --- bspump/ldap/connection.py | 2 ++ bspump/ldap/source.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/bspump/ldap/connection.py b/bspump/ldap/connection.py index ddbb0461..7ac5a129 100644 --- a/bspump/ldap/connection.py +++ b/bspump/ldap/connection.py @@ -30,6 +30,8 @@ class LDAPConnection(Connection): uri=ldaps://localhost:636 username=cn=admin,dc=example,dc=org password=abc123def456 + tls_cafile=/conf/cert.ca + tls_require_cert=allow """ ConfigDefaults = { diff --git a/bspump/ldap/source.py b/bspump/ldap/source.py index b0808f41..3c460cc0 100644 --- a/bspump/ldap/source.py +++ b/bspump/ldap/source.py @@ -84,7 +84,7 @@ def _search_worker(self, cookie=b""): cookie = sc.cookie break else: - L.error("Server ignores RFC 2696 control: No serverctrls in result") + L.error("No SimplePagedResultsControl cookie in result serverctrls") cookie = b"" return page, cookie