Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added permission assert check for registry server, offline server, online server functions #25

Merged
merged 1 commit into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 77 additions & 7 deletions sdk/python/feast/feature_server.py
redhatHameed marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
from feast import proto_json, utils
from feast.constants import DEFAULT_FEATURE_SERVER_REGISTRY_TTL
from feast.data_source import PushMode
from feast.errors import PushSourceNotFoundException
from feast.errors import FeatureViewNotFoundException, PushSourceNotFoundException
from feast.permissions.action import WRITE, AuthzedAction
from feast.permissions.security_manager import assert_permissions


# TODO: deprecate this in favor of push features
Expand Down Expand Up @@ -86,19 +88,40 @@ async def get_body(request: Request):
def get_online_features(body=Depends(get_body)):
try:
body = json.loads(body)
full_feature_names = body.get("full_feature_names", False)
entity_rows = body["entities"]
# Initialize parameters for FeatureStore.get_online_features(...) call
if "feature_service" in body:
features = store.get_feature_service(
feature_service = store.get_feature_service(
body["feature_service"], allow_cache=True
)
assert_permissions(
redhatHameed marked this conversation as resolved.
Show resolved Hide resolved
resource=feature_service, actions=[AuthzedAction.QUERY_ONLINE]
)
features = feature_service
else:
features = body["features"]

full_feature_names = body.get("full_feature_names", False)
all_feature_views, all_on_demand_feature_views = (
utils._get_feature_views_to_use(
store.registry,
store.project,
features,
allow_cache=True,
hide_dummy_entity=False,
)
)
for feature_view in all_feature_views:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not going to do this right now, but we probably need an assert function accepting a list of resources

assert_permissions(
resource=feature_view, actions=[AuthzedAction.QUERY_ONLINE]
)
for od_feature_view in all_on_demand_feature_views:
assert_permissions(
resource=od_feature_view, actions=[AuthzedAction.QUERY_ONLINE]
)

response_proto = store.get_online_features(
features=features,
entity_rows=body["entities"],
entity_rows=entity_rows,
full_feature_names=full_feature_names,
).proto

Expand All @@ -117,16 +140,41 @@ def push(body=Depends(get_body)):
try:
request = PushFeaturesRequest(**json.loads(body))
df = pd.DataFrame(request.df)
actions = []
if request.to == "offline":
to = PushMode.OFFLINE
actions = [AuthzedAction.WRITE_OFFLINE]
elif request.to == "online":
to = PushMode.ONLINE
actions = [AuthzedAction.WRITE_ONLINE]
elif request.to == "online_and_offline":
to = PushMode.ONLINE_AND_OFFLINE
actions = WRITE
else:
raise ValueError(
f"{request.to} is not a supported push format. Please specify one of these ['online', 'offline', 'online_and_offline']."
)

from feast.data_source import PushSource

all_fvs = store.list_feature_views(
allow_cache=request.allow_registry_cache
) + store.list_stream_feature_views(
allow_cache=request.allow_registry_cache
)
fvs_with_push_sources = {
fv
for fv in all_fvs
if (
fv.stream_source is not None
and isinstance(fv.stream_source, PushSource)
and fv.stream_source.name == request.push_source_name
)
}

redhatHameed marked this conversation as resolved.
Show resolved Hide resolved
for feature_view in fvs_with_push_sources:
assert_permissions(resource=feature_view, actions=actions)

store.push(
push_source_name=request.push_source_name,
df=df,
Expand All @@ -149,10 +197,24 @@ def write_to_online_store(body=Depends(get_body)):
try:
request = WriteToFeatureStoreRequest(**json.loads(body))
df = pd.DataFrame(request.df)
feature_view_name = request.feature_view_name
allow_registry_cache = request.allow_registry_cache
try:
feature_view = store.get_stream_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
except FeatureViewNotFoundException:
feature_view = store.get_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)

assert_permissions(
resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE]
)
store.write_to_online_store(
feature_view_name=request.feature_view_name,
feature_view_name=feature_view_name,
df=df,
allow_registry_cache=request.allow_registry_cache,
allow_registry_cache=allow_registry_cache,
)
except Exception as e:
# Print the original exception on the server side
Expand All @@ -168,6 +230,10 @@ def health():
def materialize(body=Depends(get_body)):
try:
request = MaterializeRequest(**json.loads(body))
for feature_view in request.feature_views:
assert_permissions(
resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE]
)
store.materialize(
utils.make_tzaware(parser.parse(request.start_ts)),
utils.make_tzaware(parser.parse(request.end_ts)),
Expand All @@ -183,6 +249,10 @@ def materialize(body=Depends(get_body)):
def materialize_incremental(body=Depends(get_body)):
try:
request = MaterializeIncrementalRequest(**json.loads(body))
for feature_view in request.feature_views:
assert_permissions(
resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE]
)
store.materialize_incremental(
utils.make_tzaware(parser.parse(request.end_ts)), request.feature_views
)
Expand Down
36 changes: 31 additions & 5 deletions sdk/python/feast/offline_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import traceback
from datetime import datetime
from typing import Any, Dict, List
from typing import Any, Dict, List, cast

import pyarrow as pa
import pyarrow.flight as fl
Expand All @@ -12,6 +12,8 @@
from feast.feature_logging import FeatureServiceLoggingSource
from feast.feature_view import DUMMY_ENTITY_NAME
from feast.infra.offline_stores.offline_utils import get_offline_store_from_config
from feast.permissions.action import AuthzedAction
from feast.permissions.security_manager import assert_permissions
from feast.saved_dataset import SavedDatasetStorage

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -217,7 +219,15 @@ def offline_write_batch(self, command: dict, key: str):
assert len(feature_views) == 1, "incorrect feature view"
table = self.flights[key]
self.offline_store.offline_write_batch(
self.store.config, feature_views[0], table, command["progress"]
self.store.config,
cast(
FeatureView,
assert_permissions(
feature_views[0], actions=[AuthzedAction.WRITE_OFFLINE]
),
),
table,
command["progress"],
)

def _validate_write_logged_features_parameters(self, command: dict):
Expand All @@ -234,6 +244,10 @@ def write_logged_features(self, command: dict, key: str):
feature_service.logging_config is not None
), "feature service must have logging_config set"

assert_permissions(
resource=feature_service,
actions=[AuthzedAction.WRITE_OFFLINE],
)
self.offline_store.write_logged_features(
config=self.store.config,
data=table,
Expand All @@ -260,10 +274,12 @@ def _validate_pull_all_from_table_or_query_parameters(self, command: dict):

def pull_all_from_table_or_query(self, command: dict):
self._validate_pull_all_from_table_or_query_parameters(command)
data_source = self.store.get_data_source(command["data_source_name"])
assert_permissions(data_source, actions=[AuthzedAction.QUERY_OFFLINE])
redhatHameed marked this conversation as resolved.
Show resolved Hide resolved

return self.offline_store.pull_all_from_table_or_query(
self.store.config,
self.store.get_data_source(command["data_source_name"]),
data_source,
command["join_key_columns"],
command["feature_name_columns"],
command["timestamp_field"],
Expand All @@ -287,10 +303,11 @@ def _validate_pull_latest_from_table_or_query_parameters(self, command: dict):

def pull_latest_from_table_or_query(self, command: dict):
self._validate_pull_latest_from_table_or_query_parameters(command)

data_source = self.store.get_data_source(command["data_source_name"])
assert_permissions(resource=data_source, actions=[AuthzedAction.QUERY_OFFLINE])
return self.offline_store.pull_latest_from_table_or_query(
self.store.config,
self.store.get_data_source(command["data_source_name"]),
data_source,
command["join_key_columns"],
command["feature_name_columns"],
command["timestamp_field"],
Expand Down Expand Up @@ -343,6 +360,11 @@ def get_historical_features(self, command: dict, key: str):
project=project,
)

for feature_view in feature_views:
assert_permissions(
redhatHameed marked this conversation as resolved.
Show resolved Hide resolved
resource=feature_view, actions=[AuthzedAction.QUERY_OFFLINE]
)

retJob = self.offline_store.get_historical_features(
config=self.store.config,
feature_views=feature_views,
Expand Down Expand Up @@ -377,6 +399,10 @@ def persist(self, command: dict, key: str):
raise NotImplementedError

data_source = self.store.get_data_source(command["data_source_name"])
assert_permissions(
resource=data_source,
actions=[AuthzedAction.WRITE_OFFLINE],
)
storage = SavedDatasetStorage.from_data_source(data_source)
ret_job.persist(storage, command["allow_overwrite"], command["timeout"])
except Exception as e:
Expand Down
9 changes: 9 additions & 0 deletions sdk/python/feast/permissions/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,12 @@ class AuthzedAction(enum.Enum):
AuthzedAction.WRITE_OFFLINE,
AuthzedAction.WRITE_ONLINE,
]


# Alias for CRUD actions
CRUD = [
AuthzedAction.CREATE,
AuthzedAction.READ,
AuthzedAction.UPDATE,
AuthzedAction.DELETE,
]
Loading
Loading