Skip to content

Commit

Permalink
Remove user sessions when resetting password (#33347)
Browse files Browse the repository at this point in the history
* Remove user sessions when resetting password

When user's password is reset, we also remove all DB sessions
for that user - for database session backend.

In case we are using securecookie mechanism, resetting password does
not invalidate old sessions, so instead we are displaying warning to
the user performing the reset that in order to clear existing
sessions of the user, the secure_key needs to be changed and it
will invalidate all sessions for all users.

Protection has been added in case the number of sessions in the DB
is too big to effectively scan and remove sessions for the user. In
such case we print warning for the user that sessions have not
been reset, and we suggest to improve the way their deployment
mechanisms create too many sessions - by either changing the way
how automation of the API calls is done and/or by purging the
sessions regularly by "airflow db clean".

* Update airflow/auth/managers/fab/security_manager/override.py

Co-authored-by: Hussein Awala <hussein@awala.fr>

---------

Co-authored-by: Hussein Awala <hussein@awala.fr>
(cherry picked from commit 2caa186)
  • Loading branch information
potiuk authored and ephraimbuddy committed Aug 14, 2023
1 parent a817f21 commit f5d8201
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 2 deletions.
53 changes: 52 additions & 1 deletion airflow/auth/managers/fab/security_manager/override.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,26 @@

from functools import cached_property

from flask import g
from flask import flash, g
from flask_appbuilder.const import AUTH_DB, AUTH_LDAP, AUTH_OAUTH, AUTH_OID, AUTH_REMOTE_USER
from flask_babel import lazy_gettext
from flask_jwt_extended import JWTManager
from flask_login import LoginManager
from itsdangerous import want_bytes
from markupsafe import Markup
from werkzeug.security import generate_password_hash

from airflow.auth.managers.fab.models import User
from airflow.auth.managers.fab.models.anonymous_user import AnonymousUser
from airflow.www.session import AirflowDatabaseSessionInterface

# This is the limit of DB user sessions that we consider as "healthy". If you have more sessions that this
# number then we will refuse to delete sessions that have expired and old user sessions when resetting
# user's password, and raise a warning in the UI instead. Usually when you have that many sessions, it means
# that there is something wrong with your deployment - for example you have an automated API call that
# continuously creates new sessions. Such setup should be fixed by reusing sessions or by periodically
# purging the old sessions by using `airflow db clean` command.
MAX_NUM_DATABASE_USER_SESSIONS = 50000


class FabAirflowSecurityManagerOverride:
Expand Down Expand Up @@ -230,8 +242,47 @@ def reset_password(self, userid, password):
"""
user = self.get_user_by_id(userid)
user.password = generate_password_hash(password)
self.reset_user_sessions(user)
self.update_user(user)

def reset_user_sessions(self, user: User) -> None:
if isinstance(self.appbuilder.get_app.session_interface, AirflowDatabaseSessionInterface):
interface = self.appbuilder.get_app.session_interface
session = interface.db.session
user_session_model = interface.sql_session_model
num_sessions = session.query(user_session_model).count()
if num_sessions > MAX_NUM_DATABASE_USER_SESSIONS:
flash(
Markup(
f"The old sessions for user {user.username} have <b>NOT</b> been deleted!<br>"
f"You have a lot ({num_sessions}) of user sessions in the 'SESSIONS' table in "
f"your database.<br> "
"This indicates that this deployment might have an automated API calls that create "
"and not reuse sessions.<br>You should consider reusing sessions or cleaning them "
"periodically using db clean.<br>"
"Make sure to reset password for the user again after cleaning the session table "
"to remove old sessions of the user."
),
"warning",
)
else:
for s in session.query(user_session_model):
session_details = interface.serializer.loads(want_bytes(s.data))
if session_details.get("_user_id") == user.id:
session.delete(s)
else:
flash(
Markup(
"Since you are using `securecookie` session backend mechanism, we cannot prevent "
f"some old sessions for user {user.username} to be reused.<br> If you want to make sure "
"that the user is logged out from all sessions, you should consider using "
"`database` session backend mechanism.<br> You can also change the 'secret_key` "
"webserver configuration for all your webserver instances and restart the webserver. "
"This however will logout all users from all sessions."
),
"warning",
)

def load_user(self, user_id):
"""Load user by ID."""
return self.get_user_by_id(int(user_id))
Expand Down
14 changes: 13 additions & 1 deletion airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1387,7 +1387,19 @@ webserver:
default: ""
session_backend:
description: |
The type of backend used to store web session data, can be 'database' or 'securecookie'
The type of backend used to store web session data, can be `database` or `securecookie`. For the
`database` backend, sessions are store in the database (in `session` table) and they can be
managed there (for example when you reset password of the user, all sessions for that user are
deleted). For the `securecookie` backend, sessions are stored in encrypted cookies on the client
side. The `securecookie` mechanism is 'lighter' than database backend, but sessions are not deleted
when you reset password of the user, which means that other than waiting for expiry time, the only
way to invalidate all sessions for a user is to change secret_key and restart webserver (which
also invalidates and logs out all other user's sessions).
When you are using `database` backend, make sure to keep your database session table small
by periodically running `airflow db clean --table session` command, especially if you have
automated API calls that will create a new session for each call rather than reuse the sessions
stored in browser cookies.
version_added: 2.2.4
type: string
example: securecookie
Expand Down
4 changes: 4 additions & 0 deletions airflow/utils/db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

from airflow import AirflowException
from airflow.cli.simple_table import AirflowConsole
from airflow.configuration import conf
from airflow.models import Base
from airflow.utils import timezone
from airflow.utils.db import reflect_tables
Expand Down Expand Up @@ -115,6 +116,9 @@ def readable_config(self):
_TableConfig(table_name="celery_tasksetmeta", recency_column_name="date_done"),
]

if conf.get("webserver", "session_backend") == "database":
config_list.append(_TableConfig(table_name="session", recency_column_name="expiry"))

config_dict: dict[str, _TableConfig] = {x.orm_model.name: x for x in sorted(config_list)}


Expand Down
112 changes: 112 additions & 0 deletions tests/www/views/test_views_custom_user_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
# under the License.
from __future__ import annotations

from datetime import datetime, timedelta
from unittest import mock

import pytest
from flask.sessions import SecureCookieSessionInterface
from flask_appbuilder import SQLA

from airflow import settings
Expand Down Expand Up @@ -166,3 +170,111 @@ def test_user_model_view_with_delete_access(self):
check_content_in_response("Deleted Row", response)
check_content_not_in_response(user_to_delete.username, response)
assert bool(self.security_manager.get_user_by_id(user_to_delete.id)) is False


# type: ignore[attr-defined]


class TestResetUserSessions:
@classmethod
def setup_class(cls):
settings.configure_orm()

def setup_method(self):
# We cannot reuse the app in tests (on class level) as in Flask 2.2 this causes
# an exception because app context teardown is removed and if even single request is run via app
# it cannot be re-intialized again by passing it as constructor to SQLA
# This makes the tests slightly slower (but they work with Flask 2.1 and 2.2
self.app = application.create_app(testing=True)
self.appbuilder = self.app.appbuilder
self.app.config["WTF_CSRF_ENABLED"] = False
self.security_manager = self.appbuilder.sm
self.interface = self.app.session_interface
self.model = self.interface.sql_session_model
self.serializer = self.interface.serializer
self.db = self.interface.db
self.db.session.query(self.model).delete()
self.db.session.commit()
self.db.session.flush()
self.user_1 = create_user(
self.app,
username="user_to_delete_1",
role_name="user_to_delete",
)
self.user_2 = create_user(
self.app,
username="user_to_delete_2",
role_name="user_to_delete",
)
self.db.session.commit()
self.db.session.flush()

def create_user_db_session(self, session_id: str, time_delta: timedelta, user_id: int):
self.db.session.add(
self.model(
session_id=session_id,
data=self.serializer.dumps({"_user_id": user_id}),
expiry=datetime.now() + time_delta,
)
)

@pytest.mark.parametrize(
"time_delta, user_sessions_deleted",
[
pytest.param(timedelta(days=-1), True, id="Both expired"),
pytest.param(timedelta(hours=1), True, id="Both fresh"),
pytest.param(timedelta(days=1), True, id="Both future"),
],
)
def test_reset_user_sessions_delete(self, time_delta: timedelta, user_sessions_deleted: bool):

self.create_user_db_session("session_id_1", time_delta, self.user_1.id)
self.create_user_db_session("session_id_2", time_delta, self.user_2.id)
self.db.session.commit()
self.db.session.flush()
assert self.db.session.query(self.model).count() == 2
assert self.get_session_by_id("session_id_1") is not None
assert self.get_session_by_id("session_id_2") is not None

self.security_manager.reset_password(self.user_1.id, "new_password")
self.db.session.commit()
self.db.session.flush()
if user_sessions_deleted:
assert self.db.session.query(self.model).count() == 1
assert self.get_session_by_id("session_id_1") is None
else:
assert self.db.session.query(self.model).count() == 2
assert self.get_session_by_id("session_id_1") is not None

def get_session_by_id(self, session_id: str):
return self.db.session.query(self.model).filter(self.model.session_id == session_id).scalar()

@mock.patch("airflow.auth.managers.fab.security_manager.override.flash")
@mock.patch("airflow.auth.managers.fab.security_manager.override.MAX_NUM_DATABASE_USER_SESSIONS", 1)
def test_refuse_delete(self, flash_mock):
self.create_user_db_session("session_id_1", timedelta(days=1), self.user_1.id)
self.create_user_db_session("session_id_2", timedelta(days=1), self.user_2.id)
self.db.session.commit()
self.db.session.flush()
assert self.db.session.query(self.model).count() == 2
assert self.get_session_by_id("session_id_1") is not None
assert self.get_session_by_id("session_id_2") is not None
self.security_manager.reset_password(self.user_1.id, "new_password")
assert flash_mock.called
assert (
"The old sessions for user user_to_delete_1 have <b>NOT</b> been deleted!"
in flash_mock.call_args[0][0]
)
assert self.db.session.query(self.model).count() == 2
assert self.get_session_by_id("session_id_1") is not None
assert self.get_session_by_id("session_id_2") is not None

@mock.patch("airflow.auth.managers.fab.security_manager.override.flash")
def test_warn_securecookie(self, flash_mock):
self.app.session_interface = SecureCookieSessionInterface()
self.security_manager.reset_password(self.user_1.id, "new_password")
assert flash_mock.called
assert (
"Since you are using `securecookie` session backend mechanism, we cannot"
in flash_mock.call_args[0][0]
)

0 comments on commit f5d8201

Please sign in to comment.