From 729c874e8c30719f23ad287d3cb84f1d654274ec Mon Sep 17 00:00:00 2001 From: Theodor Mihalache <84387487+tmihalac@users.noreply.github.com> Date: Thu, 29 Aug 2024 12:10:43 -0400 Subject: [PATCH] feat: Intra server to server communication (#4433) Intra server communication Signed-off-by: Theodor Mihalache --- .../templates/deployment.yaml | 2 + .../auth/kubernetes_token_parser.py | 11 +- .../permissions/auth/oidc_token_parser.py | 26 +- .../client/kubernetes_auth_client_manager.py | 11 + .../oidc_authentication_client_manager.py | 11 + .../feast/permissions/security_manager.py | 25 +- .../permissions/auth/test_token_parser.py | 147 +++++++++++- .../tests/unit/permissions/test_decorator.py | 2 +- .../unit/permissions/test_security_manager.py | 227 +++++++++++++++--- 9 files changed, 417 insertions(+), 45 deletions(-) diff --git a/infra/charts/feast-feature-server/templates/deployment.yaml b/infra/charts/feast-feature-server/templates/deployment.yaml index a550433db5..8dddeed6fd 100644 --- a/infra/charts/feast-feature-server/templates/deployment.yaml +++ b/infra/charts/feast-feature-server/templates/deployment.yaml @@ -36,6 +36,8 @@ spec: env: - name: FEATURE_STORE_YAML_BASE64 value: {{ .Values.feature_store_yaml_base64 }} + - name: INTRA_COMMUNICATION_BASE64 + value: {{ "intra-server-communication" | b64enc }} command: {{- if eq .Values.feast_mode "offline" }} - "feast" diff --git a/sdk/python/feast/permissions/auth/kubernetes_token_parser.py b/sdk/python/feast/permissions/auth/kubernetes_token_parser.py index c16e5232fb..7724163e5f 100644 --- a/sdk/python/feast/permissions/auth/kubernetes_token_parser.py +++ b/sdk/python/feast/permissions/auth/kubernetes_token_parser.py @@ -1,4 +1,5 @@ import logging +import os import jwt from kubernetes import client, config @@ -41,10 +42,14 @@ async def user_details_from_access_token(self, access_token: str) -> User: current_user = f"{sa_namespace}:{sa_name}" logging.info(f"Received request from {sa_name} in {sa_namespace}") - roles = self.get_roles(sa_namespace, sa_name) - logging.info(f"SA roles are: {roles}") + intra_communication_base64 = os.getenv("INTRA_COMMUNICATION_BASE64") + if sa_name is not None and sa_name == intra_communication_base64: + return User(username=sa_name, roles=[]) + else: + roles = self.get_roles(sa_namespace, sa_name) + logging.info(f"SA roles are: {roles}") - return User(username=current_user, roles=roles) + return User(username=current_user, roles=roles) def get_roles(self, namespace: str, service_account_name: str) -> list[str]: """ diff --git a/sdk/python/feast/permissions/auth/oidc_token_parser.py b/sdk/python/feast/permissions/auth/oidc_token_parser.py index fce9fdcbb2..28273e8c10 100644 --- a/sdk/python/feast/permissions/auth/oidc_token_parser.py +++ b/sdk/python/feast/permissions/auth/oidc_token_parser.py @@ -1,4 +1,6 @@ import logging +import os +from typing import Optional from unittest.mock import Mock import jwt @@ -34,7 +36,7 @@ def __init__(self, auth_config: OidcAuthConfig): async def _validate_token(self, access_token: str): """ - Validate the token extracted from the headrer of the user request against the OAuth2 server. + Validate the token extracted from the header of the user request against the OAuth2 server. """ # FastAPI's OAuth2AuthorizationCodeBearer requires a Request type but actually uses only the headers field # https://github.com/tiangolo/fastapi/blob/eca465f4c96acc5f6a22e92fd2211675ca8a20c8/fastapi/security/oauth2.py#L380 @@ -60,6 +62,11 @@ async def user_details_from_access_token(self, access_token: str) -> User: AuthenticationError if any error happens. """ + # check if intra server communication + user = self._get_intra_comm_user(access_token) + if user: + return user + try: await self._validate_token(access_token) logger.info("Validated token") @@ -108,3 +115,20 @@ async def user_details_from_access_token(self, access_token: str) -> User: except jwt.exceptions.InvalidTokenError: logger.exception("Exception while parsing the token:") raise AuthenticationError("Invalid token.") + + def _get_intra_comm_user(self, access_token: str) -> Optional[User]: + intra_communication_base64 = os.getenv("INTRA_COMMUNICATION_BASE64") + + if intra_communication_base64: + decoded_token = jwt.decode( + access_token, options={"verify_signature": False} + ) + if "preferred_username" in decoded_token: + preferred_username: str = decoded_token["preferred_username"] + if ( + preferred_username is not None + and preferred_username == intra_communication_base64 + ): + return User(username=preferred_username, roles=[]) + + return None diff --git a/sdk/python/feast/permissions/client/kubernetes_auth_client_manager.py b/sdk/python/feast/permissions/client/kubernetes_auth_client_manager.py index 1ca3c5a2ae..9957ff93a7 100644 --- a/sdk/python/feast/permissions/client/kubernetes_auth_client_manager.py +++ b/sdk/python/feast/permissions/client/kubernetes_auth_client_manager.py @@ -1,6 +1,8 @@ import logging import os +import jwt + from feast.permissions.auth_model import KubernetesAuthConfig from feast.permissions.client.auth_client_manager import AuthenticationClientManager @@ -13,6 +15,15 @@ def __init__(self, auth_config: KubernetesAuthConfig): self.token_file_path = "/var/run/secrets/kubernetes.io/serviceaccount/token" def get_token(self): + intra_communication_base64 = os.getenv("INTRA_COMMUNICATION_BASE64") + # If intra server communication call + if intra_communication_base64: + payload = { + "sub": f":::{intra_communication_base64}", # Subject claim + } + + return jwt.encode(payload, "") + try: token = self._read_token_from_file() return token diff --git a/sdk/python/feast/permissions/client/oidc_authentication_client_manager.py b/sdk/python/feast/permissions/client/oidc_authentication_client_manager.py index 6744a1d2ad..0f99cea86f 100644 --- a/sdk/python/feast/permissions/client/oidc_authentication_client_manager.py +++ b/sdk/python/feast/permissions/client/oidc_authentication_client_manager.py @@ -1,5 +1,7 @@ import logging +import os +import jwt import requests from feast.permissions.auth_model import OidcAuthConfig @@ -14,6 +16,15 @@ def __init__(self, auth_config: OidcAuthConfig): self.auth_config = auth_config def get_token(self): + intra_communication_base64 = os.getenv("INTRA_COMMUNICATION_BASE64") + # If intra server communication call + if intra_communication_base64: + payload = { + "preferred_username": f"{intra_communication_base64}", # Subject claim + } + + return jwt.encode(payload, "") + # Fetch the token endpoint from the discovery URL token_endpoint = OIDCDiscoveryService( self.auth_config.auth_discovery_url diff --git a/sdk/python/feast/permissions/security_manager.py b/sdk/python/feast/permissions/security_manager.py index 29c0e06753..c00a3d8853 100644 --- a/sdk/python/feast/permissions/security_manager.py +++ b/sdk/python/feast/permissions/security_manager.py @@ -1,4 +1,5 @@ import logging +import os from contextvars import ContextVar from typing import Callable, List, Optional, Union @@ -110,6 +111,10 @@ def assert_permissions_to_update( Raises: FeastPermissionError: If the current user is not authorized to execute all the requested actions on the given resource or on the existing one. """ + sm = get_security_manager() + if not is_auth_necessary(sm): + return resource + actions = [AuthzedAction.DESCRIBE, AuthzedAction.UPDATE] try: existing_resource = getter( @@ -142,10 +147,11 @@ def assert_permissions( Raises: FeastPermissionError: If the current user is not authorized to execute the requested actions on the given resources. """ + sm = get_security_manager() - if sm is None: + if not is_auth_necessary(sm): return resource - return sm.assert_permissions( + return sm.assert_permissions( # type: ignore[union-attr] resources=[resource], actions=actions, filter_only=False )[0] @@ -165,10 +171,11 @@ def permitted_resources( Returns: list[FeastObject]]: A filtered list of the permitted resources, possibly empty. """ + sm = get_security_manager() - if sm is None: + if not is_auth_necessary(sm): return resources - return sm.assert_permissions(resources=resources, actions=actions, filter_only=True) + return sm.assert_permissions(resources=resources, actions=actions, filter_only=True) # type: ignore[union-attr] """ @@ -201,3 +208,13 @@ def no_security_manager(): global _sm _sm = None + + +def is_auth_necessary(sm: Optional[SecurityManager]) -> bool: + intra_communication_base64 = os.getenv("INTRA_COMMUNICATION_BASE64") + + return ( + sm is not None + and sm.current_user is not None + and sm.current_user.username != intra_communication_base64 + ) diff --git a/sdk/python/tests/unit/permissions/auth/test_token_parser.py b/sdk/python/tests/unit/permissions/auth/test_token_parser.py index cb153a17c9..bac2103b4f 100644 --- a/sdk/python/tests/unit/permissions/auth/test_token_parser.py +++ b/sdk/python/tests/unit/permissions/auth/test_token_parser.py @@ -1,6 +1,6 @@ -# test_token_validator.py - import asyncio +import os +from unittest import mock from unittest.mock import MagicMock, patch import assertpy @@ -70,6 +70,75 @@ def test_oidc_token_validation_failure(mock_oauth2, oidc_config): ) +@mock.patch.dict(os.environ, {"INTRA_COMMUNICATION_BASE64": "test1234"}) +@pytest.mark.parametrize( + "intra_communication_val, is_intra_server", + [ + ("test1234", True), + ("my-name", False), + ], +) +def test_oidc_inter_server_comm( + intra_communication_val, is_intra_server, oidc_config, monkeypatch +): + async def mock_oath2(self, request): + return "OK" + + monkeypatch.setattr( + "feast.permissions.auth.oidc_token_parser.OAuth2AuthorizationCodeBearer.__call__", + mock_oath2, + ) + signing_key = MagicMock() + signing_key.key = "a-key" + monkeypatch.setattr( + "feast.permissions.auth.oidc_token_parser.PyJWKClient.get_signing_key_from_jwt", + lambda self, access_token: signing_key, + ) + + user_data = { + "preferred_username": f"{intra_communication_val}", + } + + if not is_intra_server: + user_data["resource_access"] = {_CLIENT_ID: {"roles": ["reader", "writer"]}} + + monkeypatch.setattr( + "feast.permissions.oidc_service.OIDCDiscoveryService._fetch_discovery_data", + lambda self, *args, **kwargs: { + "authorization_endpoint": "https://localhost:8080/realms/master/protocol/openid-connect/auth", + "token_endpoint": "https://localhost:8080/realms/master/protocol/openid-connect/token", + "jwks_uri": "https://localhost:8080/realms/master/protocol/openid-connect/certs", + }, + ) + + monkeypatch.setattr( + "feast.permissions.auth.oidc_token_parser.jwt.decode", + lambda self, *args, **kwargs: user_data, + ) + + access_token = "aaa-bbb-ccc" + token_parser = OidcTokenParser(auth_config=oidc_config) + user = asyncio.run( + token_parser.user_details_from_access_token(access_token=access_token) + ) + + if is_intra_server: + assertpy.assert_that(user).is_not_none() + assertpy.assert_that(user.username).is_equal_to(intra_communication_val) + assertpy.assert_that(user.roles).is_equal_to([]) + else: + assertpy.assert_that(user).is_not_none() + assertpy.assert_that(user).is_type_of(User) + if isinstance(user, User): + assertpy.assert_that(user.username).is_equal_to("my-name") + assertpy.assert_that(user.roles.sort()).is_equal_to( + ["reader", "writer"].sort() + ) + assertpy.assert_that(user.has_matching_role(["reader"])).is_true() + assertpy.assert_that(user.has_matching_role(["writer"])).is_true() + assertpy.assert_that(user.has_matching_role(["updater"])).is_false() + + # TODO RBAC: Move role bindings to a reusable fixture @patch("feast.permissions.auth.kubernetes_token_parser.config.load_incluster_config") @patch("feast.permissions.auth.kubernetes_token_parser.jwt.decode") @@ -127,3 +196,77 @@ def test_k8s_token_validation_failure(mock_jwt, mock_config): asyncio.run( token_parser.user_details_from_access_token(access_token=access_token) ) + + +@mock.patch.dict(os.environ, {"INTRA_COMMUNICATION_BASE64": "test1234"}) +@pytest.mark.parametrize( + "intra_communication_val, is_intra_server", + [ + ("test1234", True), + ("my-name", False), + ], +) +def test_k8s_inter_server_comm( + intra_communication_val, + is_intra_server, + oidc_config, + request, + rolebindings, + clusterrolebindings, + monkeypatch, +): + if is_intra_server: + subject = f":::{intra_communication_val}" + else: + sa_name = request.getfixturevalue("sa_name") + namespace = request.getfixturevalue("namespace") + subject = f"system:serviceaccount:{namespace}:{sa_name}" + rolebindings = request.getfixturevalue("rolebindings") + clusterrolebindings = request.getfixturevalue("clusterrolebindings") + + monkeypatch.setattr( + "feast.permissions.auth.kubernetes_token_parser.client.RbacAuthorizationV1Api.list_namespaced_role_binding", + lambda *args, **kwargs: rolebindings["items"], + ) + monkeypatch.setattr( + "feast.permissions.auth.kubernetes_token_parser.client.RbacAuthorizationV1Api.list_cluster_role_binding", + lambda *args, **kwargs: clusterrolebindings["items"], + ) + monkeypatch.setattr( + "feast.permissions.client.kubernetes_auth_client_manager.KubernetesAuthClientManager.get_token", + lambda self: "my-token", + ) + + monkeypatch.setattr( + "feast.permissions.auth.kubernetes_token_parser.config.load_incluster_config", + lambda: None, + ) + + monkeypatch.setattr( + "feast.permissions.auth.kubernetes_token_parser.jwt.decode", + lambda *args, **kwargs: {"sub": subject}, + ) + + roles = rolebindings["roles"] + croles = clusterrolebindings["roles"] + + access_token = "aaa-bbb-ccc" + token_parser = KubernetesTokenParser() + user = asyncio.run( + token_parser.user_details_from_access_token(access_token=access_token) + ) + + if is_intra_server: + assertpy.assert_that(user).is_not_none() + assertpy.assert_that(user.username).is_equal_to(intra_communication_val) + assertpy.assert_that(user.roles).is_equal_to([]) + else: + assertpy.assert_that(user).is_type_of(User) + if isinstance(user, User): + assertpy.assert_that(user.username).is_equal_to(f"{namespace}:{sa_name}") + assertpy.assert_that(user.roles.sort()).is_equal_to((roles + croles).sort()) + for r in roles: + assertpy.assert_that(user.has_matching_role([r])).is_true() + for cr in croles: + assertpy.assert_that(user.has_matching_role([cr])).is_true() + assertpy.assert_that(user.has_matching_role(["foo"])).is_false() diff --git a/sdk/python/tests/unit/permissions/test_decorator.py b/sdk/python/tests/unit/permissions/test_decorator.py index 92db72c93d..f434301a2c 100644 --- a/sdk/python/tests/unit/permissions/test_decorator.py +++ b/sdk/python/tests/unit/permissions/test_decorator.py @@ -7,7 +7,7 @@ @pytest.mark.parametrize( "username, can_read, can_write", [ - (None, False, False), + (None, True, True), ("r", True, False), ("w", False, True), ("rw", True, True), diff --git a/sdk/python/tests/unit/permissions/test_security_manager.py b/sdk/python/tests/unit/permissions/test_security_manager.py index d403c8123b..11b8dfb88e 100644 --- a/sdk/python/tests/unit/permissions/test_security_manager.py +++ b/sdk/python/tests/unit/permissions/test_security_manager.py @@ -9,18 +9,107 @@ assert_permissions_to_update, permitted_resources, ) +from feast.permissions.user import User @pytest.mark.parametrize( - "username, requested_actions, allowed, allowed_single, raise_error_in_assert, raise_error_in_permit", + "username, requested_actions, allowed, allowed_single, raise_error_in_assert, raise_error_in_permit, intra_communication_flag", [ - (None, [], False, [False, False], [True, True], False), - ("r", [AuthzedAction.DESCRIBE], True, [True, True], [False, False], False), - ("r", [AuthzedAction.UPDATE], False, [False, False], [True, True], False), - ("w", [AuthzedAction.DESCRIBE], False, [False, False], [True, True], False), - ("w", [AuthzedAction.UPDATE], False, [True, True], [False, False], False), - ("rw", [AuthzedAction.DESCRIBE], False, [True, True], [False, False], False), - ("rw", [AuthzedAction.UPDATE], False, [True, True], [False, False], False), + (None, [], True, [True, True], [False, False], False, False), + (None, [], True, [True, True], [False, False], False, True), + ( + "r", + [AuthzedAction.DESCRIBE], + True, + [True, True], + [False, False], + False, + False, + ), + ( + "r", + [AuthzedAction.DESCRIBE], + True, + [True, True], + [False, False], + False, + True, + ), + ("server_intra_com_val", [], True, [True, True], [False, False], False, True), + ( + "r", + [AuthzedAction.UPDATE], + False, + [False, False], + [True, True], + False, + False, + ), + ("r", [AuthzedAction.UPDATE], True, [True, True], [False, False], False, True), + ( + "w", + [AuthzedAction.DESCRIBE], + False, + [False, False], + [True, True], + False, + False, + ), + ( + "w", + [AuthzedAction.DESCRIBE], + True, + [True, True], + [True, True], + False, + True, + ), + ( + "w", + [AuthzedAction.UPDATE], + False, + [True, True], + [False, False], + False, + False, + ), + ("w", [AuthzedAction.UPDATE], False, [True, True], [False, False], False, True), + ( + "rw", + [AuthzedAction.DESCRIBE], + False, + [True, True], + [False, False], + False, + False, + ), + ( + "rw", + [AuthzedAction.DESCRIBE], + False, + [True, True], + [False, False], + False, + True, + ), + ( + "rw", + [AuthzedAction.UPDATE], + False, + [True, True], + [False, False], + False, + False, + ), + ( + "rw", + [AuthzedAction.UPDATE], + False, + [True, True], + [False, False], + False, + True, + ), ( "rw", [AuthzedAction.DESCRIBE, AuthzedAction.UPDATE], @@ -28,6 +117,16 @@ [False, False], [True, True], True, + False, + ), + ( + "rw", + [AuthzedAction.DESCRIBE, AuthzedAction.UPDATE], + True, + [True, True], + [False, False], + False, + True, ), ( "special", @@ -36,6 +135,16 @@ [False, True], [True, False], True, + False, + ), + ( + "admin", + [AuthzedAction.DESCRIBE, AuthzedAction.UPDATE], + True, + [True, True], + [False, False], + False, + True, ), ( "special", @@ -44,6 +153,16 @@ [False, False], [True, True], True, + False, + ), + ( + "admin", + READ + [AuthzedAction.UPDATE], + True, + [True, True], + [False, False], + False, + True, ), ], ) @@ -57,13 +176,21 @@ def test_access_SecuredFeatureView( allowed_single, raise_error_in_assert, raise_error_in_permit, + intra_communication_flag, + monkeypatch, ): sm = security_manager - resources = feature_views - user = users.get(username) sm.set_current_user(user) + if intra_communication_flag: + monkeypatch.setenv("INTRA_COMMUNICATION_BASE64", "server_intra_com_val") + sm.set_current_user(User("server_intra_com_val", [])) + else: + monkeypatch.delenv("INTRA_COMMUNICATION_BASE64", False) + + resources = feature_views + result = [] if raise_error_in_permit: with pytest.raises(FeastPermissionError): @@ -90,16 +217,24 @@ def test_access_SecuredFeatureView( @pytest.mark.parametrize( - "username, allowed", + "username, allowed, intra_communication_flag", [ - (None, False), - ("r", False), - ("w", False), - ("rw", False), - ("special", False), - ("updater", False), - ("creator", True), - ("admin", True), + (None, True, False), + (None, True, True), + ("r", False, False), + ("r", True, True), + ("w", False, False), + ("w", True, True), + ("rw", False, False), + ("rw", True, True), + ("special", False, False), + ("special", True, True), + ("updater", False, False), + ("updater", True, True), + ("creator", True, False), + ("creator", True, True), + ("admin", True, False), + ("admin", True, True), ], ) def test_create_entity( @@ -107,15 +242,23 @@ def test_create_entity( users, username, allowed, + intra_communication_flag, + monkeypatch, ): sm = security_manager + user = users.get(username) + sm.set_current_user(user) + + if intra_communication_flag: + monkeypatch.setenv("INTRA_COMMUNICATION_BASE64", "server_intra_com_val") + sm.set_current_user(User("server_intra_com_val", [])) + else: + monkeypatch.delenv("INTRA_COMMUNICATION_BASE64", False) + entity = Entity( name="", ) - user = users.get(username) - sm.set_current_user(user) - def getter(name: str, project: str, allow_cache: bool): raise FeastObjectNotFoundException() @@ -130,16 +273,24 @@ def getter(name: str, project: str, allow_cache: bool): @pytest.mark.parametrize( - "username, allowed", + "username, allowed, intra_communication_flag", [ - (None, False), - ("r", False), - ("w", False), - ("rw", False), - ("special", False), - ("updater", True), - ("creator", False), - ("admin", True), + (None, True, False), + (None, True, True), + ("r", False, False), + ("r", True, True), + ("w", False, False), + ("w", True, True), + ("rw", False, False), + ("rw", True, True), + ("special", False, False), + ("special", True, True), + ("updater", True, False), + ("updater", True, True), + ("creator", False, False), + ("creator", True, True), + ("admin", True, False), + ("admin", True, True), ], ) def test_update_entity( @@ -147,15 +298,23 @@ def test_update_entity( users, username, allowed, + intra_communication_flag, + monkeypatch, ): sm = security_manager + user = users.get(username) + sm.set_current_user(user) + + if intra_communication_flag: + monkeypatch.setenv("INTRA_COMMUNICATION_BASE64", "server_intra_com_val") + sm.set_current_user(User("server_intra_com_val", [])) + else: + monkeypatch.delenv("INTRA_COMMUNICATION_BASE64", False) + entity = Entity( name="", ) - user = users.get(username) - sm.set_current_user(user) - def getter(name: str, project: str, allow_cache: bool): return entity