Skip to content

Commit

Permalink
Implement is_authorized in auth manager
Browse files Browse the repository at this point in the history
  • Loading branch information
vincbeck committed Aug 15, 2023
1 parent e57d0c9 commit b34afaa
Show file tree
Hide file tree
Showing 14 changed files with 585 additions and 147 deletions.
72 changes: 71 additions & 1 deletion airflow/auth/managers/base_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
from __future__ import annotations

from abc import abstractmethod
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Sequence

from airflow.auth.managers.models.base_user import BaseUser
from airflow.auth.managers.models.resource_action import ResourceAction
from airflow.auth.managers.models.resource_details import ResourceDetails
from airflow.exceptions import AirflowException
from airflow.models.dag import DagModel
from airflow.security.permissions import RESOURCE_DAG, RESOURCE_DAG_PREFIX
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
Expand Down Expand Up @@ -54,6 +58,67 @@ def get_user_id(self) -> str:
def is_logged_in(self) -> bool:
"""Return whether the user is logged in."""

@abstractmethod
def is_authorized(
self,
action: ResourceAction,
resource_type: str,
resource_details: ResourceDetails | None = None,
user: BaseUser | None = None,
) -> bool:
"""
Return whether the user is authorized to perform a given action.
.. code-block:: python
# Check whether the logged-in user has permission to read the DAG "my_dag_id"
get_auth_manager().is_authorized(
Action.GET,
Resource.DAG,
ResourceDetails(
id="my_dag_id",
),
)
:param action: the action to perform
:param resource_type: the type of resource the user attempts to perform the action on
:param resource_details: optional details about the resource itself
:param user: the user to perform the action on. If not provided (or None), it uses the current user
"""

def is_all_authorized(
self,
actions: Sequence[tuple[ResourceAction, str, ResourceDetails | None]],
) -> bool:
"""
Wrapper around `is_authorized` to check whether the user is authorized to perform several actions.
:param actions: the list of actions to check. Each tuple is a list of parameters of `is_authorized`
"""
return all(
self.is_authorized(
action=action[0],
resource_type=action[1],
resource_details=action[2] if len(action) == 3 else None,
)
for action in actions
)

def _get_root_dag_id(self, dag_id: str) -> str:
"""
Return the root DAG id in case of sub DAG, return the DAG id otherwise.
:param dag_id: the DAG id
"""
if "." in dag_id:
dm = (
self.security_manager.appbuilder.get_session.query(DagModel.dag_id, DagModel.root_dag_id)
.filter(DagModel.dag_id == dag_id)
.first()
)
return dm.root_dag_id or dm.dag_id
return dag_id

@abstractmethod
def get_url_login(self, **kwargs) -> str:
"""Return the login page url."""
Expand Down Expand Up @@ -93,3 +158,8 @@ def security_manager(self, security_manager: AirflowSecurityManager):
:param security_manager: the security manager
"""
self._security_manager = security_manager

@staticmethod
def is_dag_resource(resource_name: str) -> bool:
"""Determines if a resource relates to a DAG."""
return resource_name == RESOURCE_DAG or resource_name.startswith(RESOURCE_DAG_PREFIX)
99 changes: 99 additions & 0 deletions airflow/auth/managers/fab/fab_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,37 @@
# under the License.
from __future__ import annotations

import itertools

from flask import url_for
from flask_login import current_user

from airflow import AirflowException
from airflow.auth.managers.base_auth_manager import BaseAuthManager
from airflow.auth.managers.fab.models import User
from airflow.auth.managers.fab.security_manager.override import FabAirflowSecurityManagerOverride
from airflow.auth.managers.models.base_user import BaseUser
from airflow.auth.managers.models.resource_action import ResourceAction
from airflow.auth.managers.models.resource_details import ResourceDetails
from airflow.security.permissions import (
ACTION_CAN_ACCESS_MENU,
ACTION_CAN_CREATE,
ACTION_CAN_DELETE,
ACTION_CAN_EDIT,
ACTION_CAN_READ,
RESOURCE_DAG,
RESOURCE_DAG_PREFIX,
)

_MAP_ACTION_NAME_TO_FAB_ACTION_NAME = {
ResourceAction.POST: [ACTION_CAN_CREATE],
# ACTION_CAN_READ and ACTION_CAN_ACCESS_MENU are merged into because they are very similar.
# We can assume that if a user has permissions to read variables, they also have permissions to access
# the menu "Variables".
ResourceAction.GET: [ACTION_CAN_READ, ACTION_CAN_ACCESS_MENU],
ResourceAction.PUT: [ACTION_CAN_EDIT],
ResourceAction.DELETE: [ACTION_CAN_DELETE],
}


class FabAuthManager(BaseAuthManager):
Expand Down Expand Up @@ -57,6 +81,49 @@ def is_logged_in(self) -> bool:
"""Return whether the user is logged in."""
return not self.get_user().is_anonymous

def is_authorized(
self,
action: ResourceAction,
resource_type: str,
resource_details: ResourceDetails | None = None,
user: BaseUser | None = None,
) -> bool:
"""
Return whether the user is authorized to perform a given action.
:param action: the action to perform
:param resource_type: the type of resource the user attempts to perform the action on
:param resource_details: optional details about the resource itself
:param user: the user to perform the action on. If not provided (or None), it uses the current user
"""
if not user:
user = self.get_user()

fab_actions = self._get_fab_actions(action)
# `permissions` is a list of tuples. Each tuple contains a FAB action name and a resource name.
# For example, if the user has permission to create a task, the tuple will be ("can_create", "task").
# It contains all combinations from the list of FAB actions and the resource name.
permissions = list(itertools.product(fab_actions, [resource_type]))

if any((action_name, resource_name) in user.perms for action_name, resource_name in permissions):
return True

if self.is_dag_resource(resource_type):
# Check whether the user has permissions to access all DAGs
if any((action_name, RESOURCE_DAG) in user.perms for action_name, resource_name in permissions):
return True

if resource_details and resource_details.id:
# Check whether the user has permissions to access a specific DAG
root_dag_id = self._get_root_dag_id(resource_details.id)
resource_dag_name = self._resource_name_for_dag(root_dag_id)
return any(
(action_name, resource_dag_name) in user.perms
for action_name, resource_name in permissions
)

return False

def get_security_manager_override_class(self) -> type:
"""Return the security manager override."""
return FabAirflowSecurityManagerOverride
Expand All @@ -81,3 +148,35 @@ def get_url_user_profile(self) -> str | None:
if not self.security_manager.user_view:
return None
return url_for(f"{self.security_manager.user_view.endpoint}.userinfo")

@staticmethod
def _get_fab_actions(action: ResourceAction) -> list[str]:
"""
Convert the action to a list of FAB actions.
:param action: the action to convert
:meta private:
"""
if action not in _MAP_ACTION_NAME_TO_FAB_ACTION_NAME:
raise AirflowException(f"Unknown action: {action}")
return _MAP_ACTION_NAME_TO_FAB_ACTION_NAME[action]

@staticmethod
def _resource_name_for_dag(dag_id: str) -> str:
"""
Returns the FAB resource name for a DAG id.
Note that since a sub-DAG should follow the permission of its parent DAG, you should pass
``DagModel.root_dag_id`` to this function, for a subdag. A normal dag should pass the
``DagModel.dag_id``.
:param dag_id: the DAG id
:meta private:
"""
if dag_id == RESOURCE_DAG:
return dag_id
if dag_id.startswith(RESOURCE_DAG_PREFIX):
return dag_id
return f"{RESOURCE_DAG_PREFIX}{dag_id}"
6 changes: 6 additions & 0 deletions airflow/auth/managers/models/base_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from __future__ import annotations

from abc import abstractmethod
from typing import Any


class BaseUser:
Expand All @@ -37,6 +38,11 @@ def is_active(self) -> bool:
def is_anonymous(self) -> bool:
...

@property
@abstractmethod
def perms(self) -> Any:
...

@abstractmethod
def get_id(self) -> str:
...
36 changes: 36 additions & 0 deletions airflow/auth/managers/models/resource_action.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from enum import Enum


class ResourceAction(Enum):
"""
This is used when doing authorization check to define the type of action/operation
the user is doing.
"""

Check failure on line 27 in airflow/auth/managers/models/resource_action.py

View workflow job for this annotation

GitHub Actions / Static checks

Ruff (D205)

airflow/auth/managers/models/resource_action.py:24:5: D205 1 blank line required between summary line and description

# Create a resource
POST = "POST"
# Read a resource
GET = "GET"
# Update a resource
PUT = "PUT"
# Delete a resource
DELETE = "DELETE"
31 changes: 31 additions & 0 deletions airflow/auth/managers/models/resource_details.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from dataclasses import dataclass


@dataclass
class ResourceDetails:
"""
Represents the details of a resource.
All fields must be optional. These details can be used in authorization decision.
"""

id: str | None = None
6 changes: 4 additions & 2 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,13 @@
from contextlib import contextmanager
from copy import deepcopy
from json.decoder import JSONDecodeError
from typing import IO, Any, Dict, Generator, Iterable, Pattern, Set, Tuple, Union
from typing import IO, TYPE_CHECKING, Any, Dict, Generator, Iterable, Pattern, Set, Tuple, Union
from urllib.parse import urlsplit

import re2
from packaging.version import parse as parse_version
from typing_extensions import overload

from airflow.auth.managers.base_auth_manager import BaseAuthManager
from airflow.exceptions import AirflowConfigException
from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH, BaseSecretsBackend
from airflow.utils import yaml
Expand All @@ -52,6 +51,9 @@
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.weight_rule import WeightRule

if TYPE_CHECKING:
from airflow.auth.managers.base_auth_manager import BaseAuthManager

log = logging.getLogger(__name__)

# show Airflow's deprecation warnings
Expand Down
12 changes: 7 additions & 5 deletions airflow/www/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
from functools import wraps
from typing import Callable, Sequence, TypeVar, cast

from flask import current_app, flash, g, redirect, render_template, request
from flask import flash, g, redirect, render_template, request

from airflow.auth.managers.models.resource_action import ResourceAction
from airflow.auth.managers.models.resource_details import ResourceDetails
from airflow.configuration import conf
from airflow.utils.net import get_hostname
from airflow.www.extensions.init_auth_manager import get_auth_manager
Expand All @@ -32,24 +34,24 @@ def get_access_denied_message():
return conf.get("webserver", "access_denied_message")


def has_access(permissions: Sequence[tuple[str, str]] | None = None) -> Callable[[T], T]:
def has_access(permissions: Sequence[tuple[ResourceAction, str]] | None = None) -> Callable[[T], T]:
"""Factory for decorator that checks current user's permissions against required permissions."""

def requires_access_decorator(func: T):
@wraps(func)
def decorated(*args, **kwargs):
__tracebackhide__ = True # Hide from pytest traceback.

appbuilder = current_app.appbuilder

dag_id = (
kwargs.get("dag_id")
or request.args.get("dag_id")
or request.form.get("dag_id")
or (request.is_json and request.json.get("dag_id"))
or None
)
if appbuilder.sm.check_authorization(permissions, dag_id):
resource_details = ResourceDetails(id=dag_id)
actions = [(perm[0], perm[1], resource_details) for perm in permissions]
if get_auth_manager().is_all_authorized(actions):
return func(*args, **kwargs)
elif get_auth_manager().is_logged_in() and not g.user.perms:
return (
Expand Down
6 changes: 0 additions & 6 deletions airflow/www/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,12 +399,6 @@ def can_access_some_dags(self, action: str, dag_id: str | None = None) -> bool:
return any(self.get_readable_dag_ids(user))
return any(self.get_editable_dag_ids(user))

def can_read_dag(self, dag_id: str, user=None) -> bool:
"""Determines whether a user has DAG read access."""
root_dag_id = self._get_root_dag_id(dag_id)
dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
return self.has_access(permissions.ACTION_CAN_READ, dag_resource_name, user=user)

def can_edit_dag(self, dag_id: str, user=None) -> bool:
"""Determines whether a user has DAG edit access."""
root_dag_id = self._get_root_dag_id(dag_id)
Expand Down
Loading

0 comments on commit b34afaa

Please sign in to comment.