Skip to content

Commit

Permalink
add init and cleanup of long lived resources
Browse files Browse the repository at this point in the history
Signed-off-by: Rob Howley <howley.robert@gmail.com>
  • Loading branch information
robhowley committed Oct 17, 2024
1 parent ca9fb9b commit 6ca3b47
Show file tree
Hide file tree
Showing 37 changed files with 507 additions and 608 deletions.
4 changes: 2 additions & 2 deletions docs/getting-started/concepts/permission.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ The permission model is based on the following components:
The `Permission` class identifies a single permission configured on the feature store and is identified by these attributes:
- `name`: The permission name.
- `types`: The list of protected resource types. Defaults to all managed types, e.g. the `ALL_RESOURCE_TYPES` alias. All sub-classes are included in the resource match.
- `name_pattern`: A regex to match the resource name. Defaults to `None`, meaning that no name filtering is applied
- `name_patterns`: A list of regex patterns to match resource names. If any regex matches, the `Permission` policy is applied. Defaults to `[]`, meaning no name filtering is applied.
- `required_tags`: Dictionary of key-value pairs that must match the resource tags. Defaults to `None`, meaning that no tags filtering is applied.
- `actions`: The actions authorized by this permission. Defaults to `ALL_VALUES`, an alias defined in the `action` module.
- `policy`: The policy to be applied to validate a client request.
Expand Down Expand Up @@ -95,7 +95,7 @@ The following permission grants authorization to read the offline store of all t
Permission(
name="reader",
types=[FeatureView],
name_pattern=".*risky.*",
name_patterns=".*risky.*", # Accepts both `str` or `list[str]` types
policy=RoleBasedPolicy(roles=["trusted"]),
actions=[AuthzedAction.READ_OFFLINE],
)
Expand Down
3 changes: 2 additions & 1 deletion docs/reference/feast-cli-commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,10 @@ Options:

```text
+-----------------------+-------------+-----------------------+-----------+----------------+-------------------------+
| NAME | TYPES | NAME_PATTERN | ACTIONS | ROLES | REQUIRED_TAGS |
| NAME | TYPES | NAME_PATTERNS | ACTIONS | ROLES | REQUIRED_TAGS |
+=======================+=============+=======================+===========+================+================+========+
| reader_permission1234 | FeatureView | transformed_conv_rate | DESCRIBE | reader | - |
| | | driver_hourly_stats | DESCRIBE | reader | - |
+-----------------------+-------------+-----------------------+-----------+----------------+-------------------------+
| writer_permission1234 | FeatureView | transformed_conv_rate | CREATE | writer | - |
+-----------------------+-------------+-----------------------+-----------+----------------+-------------------------+
Expand Down
2 changes: 1 addition & 1 deletion java/serving/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
<version>1.11.4</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-java-root -->
Expand Down
2 changes: 1 addition & 1 deletion protos/feast/core/Permission.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ message PermissionSpec {

repeated Type types = 3;

string name_pattern = 4;
repeated string name_patterns = 4;

map<string, string> required_tags = 5;

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,7 @@ def feast_permissions_list_command(ctx: click.Context, verbose: bool, tags: list
headers=[
"NAME",
"TYPES",
"NAME_PATTERN",
"NAME_PATTERNS",
"ACTIONS",
"ROLES",
"REQUIRED_TAGS",
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def handle_not_verbose_permissions_command(
[
p.name,
_to_multi_line([t.__name__ for t in p.types]), # type: ignore[union-attr, attr-defined]
p.name_pattern,
_to_multi_line(p.name_patterns),
_to_multi_line([a.value.upper() for a in p.actions]),
_to_multi_line(sorted(roles)),
_dict_to_multi_line(p.required_tags),
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,10 @@ def async_refresh():
@asynccontextmanager
async def lifespan(app: FastAPI):
async_refresh()
await store.initialize()
yield
stop_refresh()
await store.close()

app = FastAPI(lifespan=lifespan)

Expand Down
12 changes: 10 additions & 2 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ def plan(
>>> fs = FeatureStore(repo_path="project/feature_repo")
>>> driver = Entity(name="driver_id", description="driver id")
>>> driver_hourly_stats = FileSource(
... path="project/feature_repo/data/driver_stats.parquet",
... path="data/driver_stats.parquet",
... timestamp_field="event_timestamp",
... created_timestamp_column="created",
... )
Expand Down Expand Up @@ -827,7 +827,7 @@ def apply(
>>> fs = FeatureStore(repo_path="project/feature_repo")
>>> driver = Entity(name="driver_id", description="driver id")
>>> driver_hourly_stats = FileSource(
... path="project/feature_repo/data/driver_stats.parquet",
... path="data/driver_stats.parquet",
... timestamp_field="event_timestamp",
... created_timestamp_column="created",
... )
Expand Down Expand Up @@ -2078,6 +2078,14 @@ def list_saved_datasets(
self.project, allow_cache=allow_cache, tags=tags
)

async def initialize(self) -> None:
"""Initialize long-lived clients and/or resources needed for accessing datastores"""
await self._get_provider().initialize(self.config)

async def close(self) -> None:
"""Cleanup any long-lived clients and/or resources"""
await self._get_provider().close()


def _print_materialization_log(
start_date, end_date, num_feature_views: int, online_store: str
Expand Down
40 changes: 33 additions & 7 deletions sdk/python/feast/infra/offline_stores/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(
self,
evaluation_function: Callable,
full_feature_names: bool,
repo_path: str,
on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None,
metadata: Optional[RetrievalMetadata] = None,
):
Expand All @@ -67,6 +68,7 @@ def __init__(
self._full_feature_names = full_feature_names
self._on_demand_feature_views = on_demand_feature_views or []
self._metadata = metadata
self.repo_path = repo_path

@property
def full_feature_names(self) -> bool:
Expand Down Expand Up @@ -99,8 +101,13 @@ def persist(
if not allow_overwrite and os.path.exists(storage.file_options.uri):
raise SavedDatasetLocationAlreadyExists(location=storage.file_options.uri)

if not Path(storage.file_options.uri).is_absolute():
absolute_path = Path(self.repo_path) / storage.file_options.uri
else:
absolute_path = Path(storage.file_options.uri)

filesystem, path = FileSource.create_filesystem_and_path(
storage.file_options.uri,
str(absolute_path),
storage.file_options.s3_endpoint_override,
)

Expand Down Expand Up @@ -243,7 +250,9 @@ def evaluate_historical_retrieval():

all_join_keys = list(set(all_join_keys + join_keys))

df_to_join = _read_datasource(feature_view.batch_source)
df_to_join = _read_datasource(
feature_view.batch_source, config.repo_path
)

df_to_join, timestamp_field = _field_mapping(
df_to_join,
Expand Down Expand Up @@ -297,6 +306,7 @@ def evaluate_historical_retrieval():
min_event_timestamp=entity_df_event_timestamp_range[0],
max_event_timestamp=entity_df_event_timestamp_range[1],
),
repo_path=str(config.repo_path),
)
return job

Expand All @@ -316,7 +326,7 @@ def pull_latest_from_table_or_query(

# Create lazy function that is only called from the RetrievalJob object
def evaluate_offline_job():
source_df = _read_datasource(data_source)
source_df = _read_datasource(data_source, config.repo_path)

source_df = _normalize_timestamp(
source_df, timestamp_field, created_timestamp_column
Expand Down Expand Up @@ -377,6 +387,7 @@ def evaluate_offline_job():
return DaskRetrievalJob(
evaluation_function=evaluate_offline_job,
full_feature_names=False,
repo_path=str(config.repo_path),
)

@staticmethod
Expand Down Expand Up @@ -420,8 +431,13 @@ def write_logged_features(
# Since this code will be mostly used from Go-created thread, it's better to avoid producing new threads
data = pyarrow.parquet.read_table(data, use_threads=False, pre_buffer=False)

if config.repo_path is not None and not Path(destination.path).is_absolute():
absolute_path = config.repo_path / destination.path
else:
absolute_path = Path(destination.path)

filesystem, path = FileSource.create_filesystem_and_path(
destination.path,
str(absolute_path),
destination.s3_endpoint_override,
)

Expand Down Expand Up @@ -456,8 +472,14 @@ def offline_write_batch(
)

file_options = feature_view.batch_source.file_options

if config.repo_path is not None and not Path(file_options.uri).is_absolute():
absolute_path = config.repo_path / file_options.uri
else:
absolute_path = Path(file_options.uri)

filesystem, path = FileSource.create_filesystem_and_path(
file_options.uri, file_options.s3_endpoint_override
str(absolute_path), file_options.s3_endpoint_override
)
prev_table = pyarrow.parquet.read_table(
path, filesystem=filesystem, memory_map=True
Expand Down Expand Up @@ -493,7 +515,7 @@ def _get_entity_df_event_timestamp_range(
)


def _read_datasource(data_source) -> dd.DataFrame:
def _read_datasource(data_source, repo_path) -> dd.DataFrame:
storage_options = (
{
"client_kwargs": {
Expand All @@ -504,8 +526,12 @@ def _read_datasource(data_source) -> dd.DataFrame:
else None
)

if not Path(data_source.path).is_absolute():
path = repo_path / data_source.path
else:
path = data_source.path
return dd.read_parquet(
data_source.path,
path,
storage_options=storage_options,
)

Expand Down
17 changes: 14 additions & 3 deletions sdk/python/feast/infra/offline_stores/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from feast.repo_config import FeastConfigBaseModel, RepoConfig


def _read_data_source(data_source: DataSource) -> Table:
def _read_data_source(data_source: DataSource, repo_path: str) -> Table:
assert isinstance(data_source, FileSource)

if isinstance(data_source.file_format, ParquetFormat):
Expand All @@ -43,21 +43,32 @@ def _read_data_source(data_source: DataSource) -> Table:
def _write_data_source(
table: Table,
data_source: DataSource,
repo_path: str,
mode: str = "append",
allow_overwrite: bool = False,
):
assert isinstance(data_source, FileSource)

file_options = data_source.file_options

if mode == "overwrite" and not allow_overwrite and os.path.exists(file_options.uri):
if not Path(file_options.uri).is_absolute():
absolute_path = Path(repo_path) / file_options.uri
else:
absolute_path = Path(file_options.uri)

if (
mode == "overwrite"
and not allow_overwrite
and os.path.exists(str(absolute_path))
):
raise SavedDatasetLocationAlreadyExists(location=file_options.uri)

if isinstance(data_source.file_format, ParquetFormat):
if mode == "overwrite":
table = table.to_pyarrow()

filesystem, path = FileSource.create_filesystem_and_path(
file_options.uri,
str(absolute_path),
file_options.s3_endpoint_override,
)

Expand Down
11 changes: 10 additions & 1 deletion sdk/python/feast/infra/offline_stores/file_source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from pathlib import Path
from typing import Callable, Dict, Iterable, List, Optional, Tuple

import pyarrow
Expand Down Expand Up @@ -154,8 +155,16 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
if (
config.repo_path is not None
and not Path(self.file_options.uri).is_absolute()
):
absolute_path = config.repo_path / self.file_options.uri
else:
absolute_path = Path(self.file_options.uri)

filesystem, path = FileSource.create_filesystem_and_path(
self.path, self.file_options.s3_endpoint_override
str(absolute_path), self.file_options.s3_endpoint_override
)

# TODO why None check necessary
Expand Down
Loading

0 comments on commit 6ca3b47

Please sign in to comment.