Skip to content

Commit

Permalink
Merge pull request #156 from LibertyAces/feature/ldap-source
Browse files Browse the repository at this point in the history
LDAP source
  • Loading branch information
byewokko authored Jan 5, 2023
2 parents ee711a2 + e4438e5 commit 801ea4d
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 0 deletions.
7 changes: 7 additions & 0 deletions bspump/ldap/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .connection import LDAPConnection
from .source import LDAPSource

__all__ = (
"LDAPConnection",
"LDAPSource",
)
105 changes: 105 additions & 0 deletions bspump/ldap/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import contextlib
import logging
import ldap
import ldap.resiter

from ..abc.connection import Connection

#

L = logging.getLogger(__name__)

#


class LDAPObject(ldap.ldapobject.LDAPObject, ldap.resiter.ResultProcessor):
pass


class LDAPConnection(Connection):
"""
Examples of configurations:
[connection:ldap]
host=12.34.56.78
port=27017
username=cn=admin,dc=example,dc=org
password=abc123def456
[connection:ldap]
uri=ldaps://localhost:636
username=cn=admin,dc=example,dc=org
password=abc123def456
tls_cafile=/conf/cert.ca
tls_require_cert=allow
"""

ConfigDefaults = {
"host": "localhost",
"port": 0, # = use the default 389 for non-secure and 636 for secure connection
"uri": "",
"username": "cn=admin,dc=example,dc=org",
"password": "admin",

# Path to CA file in PEM format
"tls_cafile": "",

# Certificate policy.
# Possible options (from python-ldap docs):
# "never" - Don’t check server cert and host name
# "allow" - Used internally by slapd server.
# "demand" - Validate peer cert chain and host name
# "hard" - Same as "demand"
"tls_require_cert": "never",

"network_timeout": "10", # set network_timeout to -1 for no timeout
}

def __init__(self, app, id=None, config=None):
super().__init__(app, id=id, config=config)
self.TLSEnabled = len(self.Config.get("tls_cafile")) > 0
self.URI = self.Config.get("uri")
self.Host = self.Config.get("host")
self.Port = self.Config.getint("port")
if self.Port == 0:
self.Port = 636 if self.TLSEnabled else 389
if len(self.URI) == 0:
self.URI = "{scheme}://{host}:{port}".format(
scheme="ldaps" if self.TLSEnabled else "ldap",
host=self.Host,
port=self.Port
)

@contextlib.contextmanager
def ldap_client(self):
client = LDAPObject(self.URI)
client.protocol_version = ldap.VERSION3
client.set_option(ldap.OPT_REFERRALS, 0)
client.set_option(ldap.OPT_NETWORK_TIMEOUT, int(self.Config.get("network_timeout")))
if self.TLSEnabled:
self._enable_tls(client)

try:
client.simple_bind_s(self.Config.get("username"), self.Config.get("password"))
yield client
except Exception as e:
L.error("Cannot connect to LDAP server: {}".format(e), exc_info=True, struct_data={"ldap_uri": self.URI})
finally:
client.unbind_s()

def _enable_tls(self, client):
tls_cafile = self.Config.get("tls_cafile")
tls_require_cert = self.Config.get("tls_require_cert")
if len(tls_cafile) > 0:
client.set_option(ldap.OPT_X_TLS_CACERTFILE, tls_cafile)
if tls_require_cert == "never":
client.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
elif tls_require_cert == "demand":
client.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
elif tls_require_cert == "allow":
client.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_ALLOW)
elif tls_require_cert == "hard":
client.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_HARD)
else:
raise ValueError("Invalid 'tls_require_cert' value: {}.")
client.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
90 changes: 90 additions & 0 deletions bspump/ldap/source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import logging
import ldap
import ldap.controls

from ..abc.source import TriggerSource

#

L = logging.getLogger(__name__)

#

class LDAPSource(TriggerSource):

ConfigDefaults = {
"base": "dc=example,dc=org",
"filter": "(&(objectClass=inetOrgPerson)(cn=*))",
"attributes": "sAMAccountName cn createTimestamp modifyTimestamp UserAccountControl email",
"results_per_page": 1000,
"attribute_encoding": "utf-8",
}

def __init__(self, app, pipeline, connection, id=None, config=None):
super().__init__(app, pipeline, id=id, config=config)
self.Connection = pipeline.locate_connection(app, connection)
self.ProactorService = app.get_service("asab.ProactorService")
self.Scope = ldap.SCOPE_SUBTREE
self.Base = self.Config.get("base")
self.Filter = self.Config.get("filter")
self.Attributes = self.Config.get("attributes").split(" ")
self.AttributeEncoding = self.Config.get("attribute_encoding")
self.ResultsPerPage = self.Config.getint("results_per_page")

async def cycle(self):
# TODO: Throttling
await self.Pipeline.ready()
cookie = b""
while True:
page, cookie = await self.ProactorService.execute(
self._search_worker, cookie)
for entry in page:
await self.process(entry, context={})
if cookie is None or len(cookie) == 0:
break

def _search_worker(self, cookie=b""):
page = []
with self.Connection.ldap_client() as client:
paged_results_control = ldap.controls.SimplePagedResultsControl(
True,
size=self.ResultsPerPage,
cookie=cookie
)
msgid = client.search_ext(
base=self.Base,
scope=self.Scope,
filterstr=self.Filter,
attrlist=self.Attributes,
serverctrls=[paged_results_control],
)
res_type, res_data, res_msgid, serverctrls = client.result3(msgid)
for dn, attrs in res_data:
if dn is None:
# Skip system entries
continue

event = {"dn": dn}
# LDAP returns all attributes as lists of bytestrings, e.g.:
# {"sAMAccountName": [b"vhavel"], ...}
# Unpack and decode them
for k, v in attrs.items():
if isinstance(v, list):
if len(v) < 1:
continue
elif len(v) == 1:
v = v[0].decode(self.AttributeEncoding)
else:
v = [item.decode(self.AttributeEncoding) for item in v]
event[k] = v
page.append(event)

for sc in serverctrls:
if sc.controlType == ldap.controls.SimplePagedResultsControl.controlType:
cookie = sc.cookie
break
else:
L.error("No SimplePagedResultsControl cookie in result serverctrls")
cookie = b""

return page, cookie
55 changes: 55 additions & 0 deletions examples/bspump-ldap-source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#!/usr/bin/env python3
import logging
import bspump
import bspump.common
import bspump.trigger
import bspump.ldap

###

L = logging.getLogger(__name__)

###


class NormalizeProcessor(bspump.Processor):
def process(self, context, event):
if "sAMAccountName" in event:
event["username"] = event.pop("sAMAccountName")
if "userAccountControl" in event:
event["suspended"] = int(event.pop("userAccountControl")) & 2 == 2
return event


class LDAPPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.ldap.LDAPSource(app, self, "LDAPConnection").on(bspump.trigger.PubSubTrigger(app, "RunLDAPPipeline!")),
NormalizeProcessor(app, self),
bspump.common.PPrintSink(app, self)
)


if __name__ == "__main__":
"""
This simple pipeline connects to LDAP, retrieves user data and prints them to stdout.
Make sure to update the connection and source config with the credentials for your LDAP server.
"""
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")

# Create LDAP connection
svc.add_connection(
bspump.ldap.LDAPConnection(app, "LDAPConnection")
)

# Construct and register the pipeline
pl = LDAPPipeline(app, "LDAPPipeline")
svc.add_pipeline(pl)

# Trigger the pipeline
app.PubSub.publish("RunLDAPPipeline!", asynchronously=True)

app.run()
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ def run(self):
'xxhash>=1.4.4',
'orjson>=3.4.7',
],
extras_require={
'ldap': 'python-ldap',
},
scripts=[
'utils/bselastic',
'utils/bskibana',
Expand Down

0 comments on commit 801ea4d

Please sign in to comment.