-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
ldap_auth.py
98 lines (76 loc) · 2.92 KB
/
ldap_auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import logging
import sys
from flask import Blueprint, flash, redirect, render_template, request, url_for
from flask_login import current_user
from redash import settings
try:
from ldap3 import Connection, Server
except ImportError:
if settings.LDAP_LOGIN_ENABLED:
sys.exit(
"The ldap3 library was not found. This is required to use LDAP authentication. Rebuild the Docker image installing the `ldap3` poetry dependency group."
)
from redash.authentication import (
create_and_login_user,
get_next_path,
logout_and_redirect_to_index,
)
from redash.authentication.org_resolving import current_org
from redash.handlers.base import org_scoped_rule
logger = logging.getLogger("ldap_auth")
blueprint = Blueprint("ldap_auth", __name__)
@blueprint.route(org_scoped_rule("/ldap/login"), methods=["GET", "POST"])
def login(org_slug=None):
index_url = url_for("redash.index", org_slug=org_slug)
unsafe_next_path = request.args.get("next", index_url)
next_path = get_next_path(unsafe_next_path)
if not settings.LDAP_LOGIN_ENABLED:
logger.error("Cannot use LDAP for login without being enabled in settings")
return redirect(url_for("redash.index", next=next_path))
if current_user.is_authenticated:
return redirect(next_path)
if request.method == "POST":
ldap_user = auth_ldap_user(request.form["email"], request.form["password"])
if ldap_user is not None:
user = create_and_login_user(
current_org,
ldap_user[settings.LDAP_DISPLAY_NAME_KEY][0],
ldap_user[settings.LDAP_EMAIL_KEY][0],
)
if user is None:
return logout_and_redirect_to_index()
return redirect(next_path or url_for("redash.index"))
else:
flash("Incorrect credentials.")
return render_template(
"login.html",
org_slug=org_slug,
next=next_path,
email=request.form.get("email", ""),
show_password_login=True,
username_prompt=settings.LDAP_CUSTOM_USERNAME_PROMPT,
hide_forgot_password=True,
)
def auth_ldap_user(username, password):
server = Server(settings.LDAP_HOST_URL, use_ssl=settings.LDAP_SSL)
if settings.LDAP_BIND_DN is not None:
conn = Connection(
server,
settings.LDAP_BIND_DN,
password=settings.LDAP_BIND_DN_PASSWORD,
authentication=settings.LDAP_AUTH_METHOD,
auto_bind=True,
)
else:
conn = Connection(server, auto_bind=True)
conn.search(
settings.LDAP_SEARCH_DN,
settings.LDAP_SEARCH_TEMPLATE % {"username": username},
attributes=[settings.LDAP_DISPLAY_NAME_KEY, settings.LDAP_EMAIL_KEY],
)
if len(conn.entries) == 0:
return None
user = conn.entries[0]
if not conn.rebind(user=user.entry_dn, password=password):
return None
return user