Skip to content

Commit

Permalink
Implement is_authorized_variable in AWS auth manager (#35804)
Browse files Browse the repository at this point in the history
  • Loading branch information
vincbeck authored Nov 27, 2023
1 parent 99b4eb7 commit 3b3ebaf
Show file tree
Hide file tree
Showing 17 changed files with 617 additions and 17 deletions.
16 changes: 16 additions & 0 deletions airflow/providers/amazon/aws/auth_manager/avp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
57 changes: 57 additions & 0 deletions airflow/providers/amazon/aws/auth_manager/avp/entities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from enum import Enum
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from airflow.auth.managers.base_auth_manager import ResourceMethod

AVP_PREFIX_ENTITIES = "Airflow::"


class AvpEntities(Enum):
"""Enum of Amazon Verified Permissions entities."""

ACTION = "Action"
ROLE = "Role"
VARIABLE = "Variable"
USER = "User"


def get_entity_type(resource_type: AvpEntities) -> str:
"""
Return entity type.
:param resource_type: Resource type.
Example: Airflow::Action, Airflow::Role, Airflow::Variable, Airflow::User.
"""
return AVP_PREFIX_ENTITIES + resource_type.value


def get_action_id(resource_type: AvpEntities, method: ResourceMethod):
"""
Return action id.
Convention for action ID is <resource_type>::<method>. Example: Variable::GET.
:param resource_type: Resource type.
:param method: Resource method.
"""
return f"{resource_type.value}::{method}"
126 changes: 126 additions & 0 deletions airflow/providers/amazon/aws/auth_manager/avp/facade.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from functools import cached_property
from typing import TYPE_CHECKING, Callable

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.auth_manager.avp.entities import AvpEntities, get_action_id, get_entity_type
from airflow.providers.amazon.aws.auth_manager.constants import (
CONF_AVP_POLICY_STORE_ID_KEY,
CONF_CONN_ID_KEY,
CONF_SECTION_NAME,
)
from airflow.providers.amazon.aws.hooks.verified_permissions import VerifiedPermissionsHook
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
from airflow.auth.managers.base_auth_manager import ResourceMethod
from airflow.providers.amazon.aws.auth_manager.user import AwsAuthManagerUser


class AwsAuthManagerAmazonVerifiedPermissionsFacade(LoggingMixin):
"""
Facade for Amazon Verified Permissions.
Used as an intermediate layer between AWS auth manager and Amazon Verified Permissions.
"""

@cached_property
def avp_client(self):
"""Build Amazon Verified Permissions client."""
aws_conn_id = conf.get(CONF_SECTION_NAME, CONF_CONN_ID_KEY)
return VerifiedPermissionsHook(aws_conn_id=aws_conn_id).conn

@cached_property
def avp_policy_store_id(self):
"""Get the Amazon Verified Permission policy store ID from config."""
return conf.get_mandatory_value(CONF_SECTION_NAME, CONF_AVP_POLICY_STORE_ID_KEY)

def is_authorized(
self,
*,
method: ResourceMethod,
entity_type: AvpEntities,
user: AwsAuthManagerUser,
entity_id: str | None = None,
entity_fetcher: Callable | None = None,
) -> bool:
"""
Make an authorization decision against Amazon Verified Permissions.
Check whether the user has permissions to access given resource.
:param method: the method to perform
:param entity_type: the entity type the user accesses
:param user: the user
:param entity_id: the entity ID the user accesses. If not provided, all entities of the type will be
considered.
:param entity_fetcher: function that returns list of entities to be passed to Amazon Verified
Permissions. Only needed if some resource properties are used in the policies (e.g. DAG folder).
"""
entity_list = self._get_user_role_entities(user)
if entity_fetcher and entity_id:
# If no entity ID is provided, there is no need to fetch entities.
# We just need to know whether the user has permissions to access all resources from this type
entity_list += entity_fetcher()

self.log.debug(
"Making authorization request for user=%s, method=%s, entity_type=%s, entity_id=%s",
user.get_id(),
method,
entity_type,
entity_id,
)

resp = self.avp_client.is_authorized(
policyStoreId=self.avp_policy_store_id,
principal={"entityType": get_entity_type(AvpEntities.USER), "entityId": user.get_id()},
action={
"actionType": get_entity_type(AvpEntities.ACTION),
"actionId": get_action_id(entity_type, method),
},
resource={"entityType": get_entity_type(entity_type), "entityId": entity_id or "*"},
entities={"entityList": entity_list},
)

self.log.debug("Authorization response: %s", resp)

if len(resp.get("errors", [])) > 0:
self.log.error(
"Error occurred while making an authorization decision. Errors: %s", resp["errors"]
)
raise AirflowException("Error occurred while making an authorization decision.")

return resp["decision"] == "ALLOW"

@staticmethod
def _get_user_role_entities(user: AwsAuthManagerUser) -> list[dict]:
user_entity = {
"identifier": {"entityType": get_entity_type(AvpEntities.USER), "entityId": user.get_id()},
"parents": [
{"entityType": get_entity_type(AvpEntities.ROLE), "entityId": group}
for group in user.get_groups()
],
}
role_entities = [
{"identifier": {"entityType": get_entity_type(AvpEntities.ROLE), "entityId": group}}
for group in user.get_groups()
]
return [user_entity, *role_entities]
14 changes: 13 additions & 1 deletion airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

from airflow.configuration import conf
from airflow.exceptions import AirflowOptionalProviderFeatureException
from airflow.providers.amazon.aws.auth_manager.avp.entities import AvpEntities
from airflow.providers.amazon.aws.auth_manager.avp.facade import AwsAuthManagerAmazonVerifiedPermissionsFacade
from airflow.providers.amazon.aws.auth_manager.constants import (
CONF_ENABLE_KEY,
CONF_SECTION_NAME,
Expand Down Expand Up @@ -72,6 +74,10 @@ def __init__(self, appbuilder: AirflowAppBuilder) -> None:
"The AWS auth manager is currently being built. It is not finalized. It is not intended to be used yet."
)

@cached_property
def avp_facade(self):
return AwsAuthManagerAmazonVerifiedPermissionsFacade()

def get_user(self) -> AwsAuthManagerUser | None:
return session["aws_user"] if self.is_logged_in() else None

Expand Down Expand Up @@ -122,7 +128,13 @@ def is_authorized_pool(
def is_authorized_variable(
self, *, method: ResourceMethod, details: VariableDetails | None = None, user: BaseUser | None = None
) -> bool:
return self.is_logged_in()
variable_key = details.key if details else None
return self.avp_facade.is_authorized(
method=method,
entity_type=AvpEntities.VARIABLE,
user=user or self.get_user(),
entity_id=variable_key,
)

def is_authorized_view(
self,
Expand Down
4 changes: 3 additions & 1 deletion airflow/providers/amazon/aws/auth_manager/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
# Configuration keys
from __future__ import annotations

CONF_ENABLE_KEY = "enable"
CONF_SECTION_NAME = "aws_auth_manager"
CONF_CONN_ID_KEY = "conn_id"
CONF_SAML_METADATA_URL_KEY = "saml_metadata_url"
CONF_ENABLE_KEY = "enable"
CONF_AVP_POLICY_STORE_ID_KEY = "avp_policy_store_id"
3 changes: 3 additions & 0 deletions airflow/providers/amazon/aws/auth_manager/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,6 @@ def get_id(self) -> str:

def get_name(self) -> str:
return self.username or self.email or self.user_id

def get_groups(self):
return self.groups
44 changes: 44 additions & 0 deletions airflow/providers/amazon/aws/hooks/verified_permissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING

from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook

if TYPE_CHECKING:
from mypy_boto3_verifiedpermissions.client import VerifiedPermissionsClient # noqa


class VerifiedPermissionsHook(AwsGenericHook["VerifiedPermissionsClient"]):
"""
Interact with Amazon Verified Permissions.
Provide thin wrapper around :external+boto3:py:class:`boto3.client("verifiedpermissions")
<VerifiedPermissions.Client>`.
Additional arguments (such as ``aws_conn_id``) may be specified and
are passed down to the underlying AwsBaseHook.
.. seealso::
- :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
- `Amazon Appflow API Reference <https://docs.aws.amazon.com/verifiedpermissions/latest/apireference/Welcome.html>`__
"""

def __init__(self, *args, **kwargs) -> None:
kwargs["client_type"] = "verifiedpermissions"
super().__init__(*args, **kwargs)
23 changes: 23 additions & 0 deletions airflow/providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ integrations:
- /docs/apache-airflow-providers-amazon/operators/glue_databrew.rst
logo: /integration-logos/aws/AWS-Glue-DataBrew_64.png
tags: [aws]
- integration-name: Amazon Verified Permissions
external-doc-url: https://aws.amazon.com/verified-permissions/
logo: /integration-logos/aws/Amazon-Verified-Permissions.png
tags: [aws]

operators:
- integration-name: Amazon Athena
Expand Down Expand Up @@ -563,6 +567,9 @@ hooks:
- integration-name: AWS Glue DataBrew
python-modules:
- airflow.providers.amazon.aws.hooks.glue_databrew
- integration-name: Amazon Verified Permissions
python-modules:
- airflow.providers.amazon.aws.hooks.verified_permissions

triggers:
- integration-name: Amazon Web Services
Expand Down Expand Up @@ -915,6 +922,14 @@ config:
type: boolean
example: "True"
default: "False"
conn_id:
description: |
The Airflow connection (i.e. credentials) used by the AWS auth manager to make API calls to AWS
Identity Center and Amazon Verified Permissions.
version_added: 8.12.0
type: string
example: "aws_default"
default: "aws_default"
saml_metadata_url:
description: |
SAML metadata XML file provided by AWS Identity Center.
Expand All @@ -923,6 +938,14 @@ config:
type: string
example: "https://portal.sso.<region>.amazonaws.com/saml/metadata/XXXXXXXXXX"
default: ~
avp_policy_store_id:
description: |
Amazon Verified Permissions' policy store ID where all the policies defining user permissions
in Airflow are stored. Required.
version_added: 8.12.0
type: string
example: ~
default: ~

executors:
- airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor
23 changes: 11 additions & 12 deletions airflow/www/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from functools import wraps
from typing import TYPE_CHECKING, Callable, Sequence, TypeVar, cast

from flask import flash, redirect, render_template, request, url_for
from flask import flash, redirect, render_template, request
from flask_appbuilder._compat import as_unicode
from flask_appbuilder.const import (
FLAMSG_ERR_SEC_ACCESS_DENIED,
Expand Down Expand Up @@ -107,21 +107,20 @@ def wraps(self, *args, **kwargs):
_permission_name = self.method_permission_name.get(f.__name__)
if _permission_name:
permission_str = f"{PERMISSION_PREFIX}{_permission_name}"
if permission_str in self.base_permissions and self.appbuilder.sm.has_access(
action_name=permission_str,
resource_name=self.class_permission_name,
resource_pk=kwargs.get("pk"),
if (
get_auth_manager().is_logged_in()
and permission_str in self.base_permissions
and self.appbuilder.sm.has_access(
action_name=permission_str,
resource_name=self.class_permission_name,
resource_pk=kwargs.get("pk"),
)
):
return f(self, *args, **kwargs)
else:
log.warning(LOGMSG_ERR_SEC_ACCESS_DENIED.format(permission_str, self.__class__.__name__))
log.warning(LOGMSG_ERR_SEC_ACCESS_DENIED, permission_str, self.__class__.__name__)
flash(as_unicode(FLAMSG_ERR_SEC_ACCESS_DENIED), "danger")
return redirect(
url_for(
self.appbuilder.sm.auth_view.__class__.__name__ + ".login",
next=request.url,
)
)
return redirect(get_auth_manager().get_url_login(next=request.url))

f._permission_name = permission_str
return functools.update_wrapper(wraps, f)
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 3b3ebaf

Please sign in to comment.