Skip to content

Commit

Permalink
Merge branch 'feast-dev:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee authored Oct 18, 2024
2 parents f99c0c9 + 18d0eaa commit 771760f
Show file tree
Hide file tree
Showing 42 changed files with 323 additions and 598 deletions.
18 changes: 18 additions & 0 deletions .github/workflows/operator_pr.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name: operator-pr

on: [pull_request]
jobs:
operator-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Go
uses: actions/setup-go@v2
with:
go-version: 1.21.x
- name: Operator tests
run: |
cd infra/feast-operator/
make test
- name: After code formatting, check for uncommitted differences
run: git diff --exit-code infra/feast-operator
1 change: 1 addition & 0 deletions community/ADOPTERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ alphabetical order.
| Get Ground | Zhiling Chen | zhilingc |
| Gojek | Pradithya Aria Pura | pradithya |
| Twitter | David Liu | mavysavydav|
| SeatGeek | Rob Howley | robhowley |
| Shopify | Matt Delacour | MattDelac |
| Snowflake | Miles Adkins | sfc-gh-madkins |
4 changes: 2 additions & 2 deletions docs/getting-started/concepts/permission.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ The permission model is based on the following components:
The `Permission` class identifies a single permission configured on the feature store and is identified by these attributes:
- `name`: The permission name.
- `types`: The list of protected resource types. Defaults to all managed types, e.g. the `ALL_RESOURCE_TYPES` alias. All sub-classes are included in the resource match.
- `name_pattern`: A regex to match the resource name. Defaults to `None`, meaning that no name filtering is applied
- `name_patterns`: A list of regex patterns to match resource names. If any regex matches, the `Permission` policy is applied. Defaults to `[]`, meaning no name filtering is applied.
- `required_tags`: Dictionary of key-value pairs that must match the resource tags. Defaults to `None`, meaning that no tags filtering is applied.
- `actions`: The actions authorized by this permission. Defaults to `ALL_VALUES`, an alias defined in the `action` module.
- `policy`: The policy to be applied to validate a client request.
Expand Down Expand Up @@ -95,7 +95,7 @@ The following permission grants authorization to read the offline store of all t
Permission(
name="reader",
types=[FeatureView],
name_pattern=".*risky.*",
name_patterns=".*risky.*", # Accepts both `str` or `list[str]` types
policy=RoleBasedPolicy(roles=["trusted"]),
actions=[AuthzedAction.READ_OFFLINE],
)
Expand Down
3 changes: 2 additions & 1 deletion docs/reference/feast-cli-commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,10 @@ Options:

```text
+-----------------------+-------------+-----------------------+-----------+----------------+-------------------------+
| NAME | TYPES | NAME_PATTERN | ACTIONS | ROLES | REQUIRED_TAGS |
| NAME | TYPES | NAME_PATTERNS | ACTIONS | ROLES | REQUIRED_TAGS |
+=======================+=============+=======================+===========+================+================+========+
| reader_permission1234 | FeatureView | transformed_conv_rate | DESCRIBE | reader | - |
| | | driver_hourly_stats | DESCRIBE | reader | - |
+-----------------------+-------------+-----------------------+-----------+----------------+-------------------------+
| writer_permission1234 | FeatureView | transformed_conv_rate | CREATE | writer | - |
+-----------------------+-------------+-----------------------+-----------+----------------+-------------------------+
Expand Down
2 changes: 1 addition & 1 deletion protos/feast/core/Permission.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ message PermissionSpec {

repeated Type types = 3;

string name_pattern = 4;
repeated string name_patterns = 4;

map<string, string> required_tags = 5;

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,7 @@ def feast_permissions_list_command(ctx: click.Context, verbose: bool, tags: list
headers=[
"NAME",
"TYPES",
"NAME_PATTERN",
"NAME_PATTERNS",
"ACTIONS",
"ROLES",
"REQUIRED_TAGS",
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def handle_not_verbose_permissions_command(
[
p.name,
_to_multi_line([t.__name__ for t in p.types]), # type: ignore[union-attr, attr-defined]
p.name_pattern,
_to_multi_line(p.name_patterns),
_to_multi_line([a.value.upper() for a in p.actions]),
_to_multi_line(sorted(roles)),
_dict_to_multi_line(p.required_tags),
Expand Down
16 changes: 12 additions & 4 deletions sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import psutil
from dateutil import parser
from fastapi import Depends, FastAPI, Request, Response, status
from fastapi.concurrency import run_in_threadpool
from fastapi.logger import logger
from fastapi.responses import JSONResponse
from google.protobuf.json_format import MessageToDict
Expand Down Expand Up @@ -112,7 +113,7 @@ async def get_body(request: Request):
"/get-online-features",
dependencies=[Depends(inject_user_details)],
)
def get_online_features(body=Depends(get_body)):
async def get_online_features(body=Depends(get_body)):
body = json.loads(body)
full_feature_names = body.get("full_feature_names", False)
entity_rows = body["entities"]
Expand Down Expand Up @@ -145,15 +146,22 @@ def get_online_features(body=Depends(get_body)):
resource=od_feature_view, actions=[AuthzedAction.READ_ONLINE]
)

response_proto = store.get_online_features(
read_params = dict(
features=features,
entity_rows=entity_rows,
full_feature_names=full_feature_names,
).proto
)

if store._get_provider().async_supported.online.read:
response = await store.get_online_features_async(**read_params)
else:
response = await run_in_threadpool(
lambda: store.get_online_features(**read_params)
)

# Convert the Protobuf object to JSON and return it
return MessageToDict(
response_proto, preserving_proto_field_name=True, float_precision=18
response.proto, preserving_proto_field_name=True, float_precision=18
)

@app.post("/push", dependencies=[Depends(inject_user_details)])
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ def plan(
>>> fs = FeatureStore(repo_path="project/feature_repo")
>>> driver = Entity(name="driver_id", description="driver id")
>>> driver_hourly_stats = FileSource(
... path="project/feature_repo/data/driver_stats.parquet",
... path="data/driver_stats.parquet",
... timestamp_field="event_timestamp",
... created_timestamp_column="created",
... )
Expand Down Expand Up @@ -827,7 +827,7 @@ def apply(
>>> fs = FeatureStore(repo_path="project/feature_repo")
>>> driver = Entity(name="driver_id", description="driver id")
>>> driver_hourly_stats = FileSource(
... path="project/feature_repo/data/driver_stats.parquet",
... path="data/driver_stats.parquet",
... timestamp_field="event_timestamp",
... created_timestamp_column="created",
... )
Expand Down
40 changes: 33 additions & 7 deletions sdk/python/feast/infra/offline_stores/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(
self,
evaluation_function: Callable,
full_feature_names: bool,
repo_path: str,
on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None,
metadata: Optional[RetrievalMetadata] = None,
):
Expand All @@ -67,6 +68,7 @@ def __init__(
self._full_feature_names = full_feature_names
self._on_demand_feature_views = on_demand_feature_views or []
self._metadata = metadata
self.repo_path = repo_path

@property
def full_feature_names(self) -> bool:
Expand Down Expand Up @@ -99,8 +101,13 @@ def persist(
if not allow_overwrite and os.path.exists(storage.file_options.uri):
raise SavedDatasetLocationAlreadyExists(location=storage.file_options.uri)

if not Path(storage.file_options.uri).is_absolute():
absolute_path = Path(self.repo_path) / storage.file_options.uri
else:
absolute_path = Path(storage.file_options.uri)

filesystem, path = FileSource.create_filesystem_and_path(
storage.file_options.uri,
str(absolute_path),
storage.file_options.s3_endpoint_override,
)

Expand Down Expand Up @@ -243,7 +250,9 @@ def evaluate_historical_retrieval():

all_join_keys = list(set(all_join_keys + join_keys))

df_to_join = _read_datasource(feature_view.batch_source)
df_to_join = _read_datasource(
feature_view.batch_source, config.repo_path
)

df_to_join, timestamp_field = _field_mapping(
df_to_join,
Expand Down Expand Up @@ -297,6 +306,7 @@ def evaluate_historical_retrieval():
min_event_timestamp=entity_df_event_timestamp_range[0],
max_event_timestamp=entity_df_event_timestamp_range[1],
),
repo_path=str(config.repo_path),
)
return job

Expand All @@ -316,7 +326,7 @@ def pull_latest_from_table_or_query(

# Create lazy function that is only called from the RetrievalJob object
def evaluate_offline_job():
source_df = _read_datasource(data_source)
source_df = _read_datasource(data_source, config.repo_path)

source_df = _normalize_timestamp(
source_df, timestamp_field, created_timestamp_column
Expand Down Expand Up @@ -377,6 +387,7 @@ def evaluate_offline_job():
return DaskRetrievalJob(
evaluation_function=evaluate_offline_job,
full_feature_names=False,
repo_path=str(config.repo_path),
)

@staticmethod
Expand Down Expand Up @@ -420,8 +431,13 @@ def write_logged_features(
# Since this code will be mostly used from Go-created thread, it's better to avoid producing new threads
data = pyarrow.parquet.read_table(data, use_threads=False, pre_buffer=False)

if config.repo_path is not None and not Path(destination.path).is_absolute():
absolute_path = config.repo_path / destination.path
else:
absolute_path = Path(destination.path)

filesystem, path = FileSource.create_filesystem_and_path(
destination.path,
str(absolute_path),
destination.s3_endpoint_override,
)

Expand Down Expand Up @@ -456,8 +472,14 @@ def offline_write_batch(
)

file_options = feature_view.batch_source.file_options

if config.repo_path is not None and not Path(file_options.uri).is_absolute():
absolute_path = config.repo_path / file_options.uri
else:
absolute_path = Path(file_options.uri)

filesystem, path = FileSource.create_filesystem_and_path(
file_options.uri, file_options.s3_endpoint_override
str(absolute_path), file_options.s3_endpoint_override
)
prev_table = pyarrow.parquet.read_table(
path, filesystem=filesystem, memory_map=True
Expand Down Expand Up @@ -493,7 +515,7 @@ def _get_entity_df_event_timestamp_range(
)


def _read_datasource(data_source) -> dd.DataFrame:
def _read_datasource(data_source, repo_path) -> dd.DataFrame:
storage_options = (
{
"client_kwargs": {
Expand All @@ -504,8 +526,12 @@ def _read_datasource(data_source) -> dd.DataFrame:
else None
)

if not Path(data_source.path).is_absolute():
path = repo_path / data_source.path
else:
path = data_source.path
return dd.read_parquet(
data_source.path,
path,
storage_options=storage_options,
)

Expand Down
17 changes: 14 additions & 3 deletions sdk/python/feast/infra/offline_stores/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from feast.repo_config import FeastConfigBaseModel, RepoConfig


def _read_data_source(data_source: DataSource) -> Table:
def _read_data_source(data_source: DataSource, repo_path: str) -> Table:
assert isinstance(data_source, FileSource)

if isinstance(data_source.file_format, ParquetFormat):
Expand All @@ -43,21 +43,32 @@ def _read_data_source(data_source: DataSource) -> Table:
def _write_data_source(
table: Table,
data_source: DataSource,
repo_path: str,
mode: str = "append",
allow_overwrite: bool = False,
):
assert isinstance(data_source, FileSource)

file_options = data_source.file_options

if mode == "overwrite" and not allow_overwrite and os.path.exists(file_options.uri):
if not Path(file_options.uri).is_absolute():
absolute_path = Path(repo_path) / file_options.uri
else:
absolute_path = Path(file_options.uri)

if (
mode == "overwrite"
and not allow_overwrite
and os.path.exists(str(absolute_path))
):
raise SavedDatasetLocationAlreadyExists(location=file_options.uri)

if isinstance(data_source.file_format, ParquetFormat):
if mode == "overwrite":
table = table.to_pyarrow()

filesystem, path = FileSource.create_filesystem_and_path(
file_options.uri,
str(absolute_path),
file_options.s3_endpoint_override,
)

Expand Down
11 changes: 10 additions & 1 deletion sdk/python/feast/infra/offline_stores/file_source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from pathlib import Path
from typing import Callable, Dict, Iterable, List, Optional, Tuple

import pyarrow
Expand Down Expand Up @@ -154,8 +155,16 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
if (
config.repo_path is not None
and not Path(self.file_options.uri).is_absolute()
):
absolute_path = config.repo_path / self.file_options.uri
else:
absolute_path = Path(self.file_options.uri)

filesystem, path = FileSource.create_filesystem_and_path(
self.path, self.file_options.s3_endpoint_override
str(absolute_path), self.file_options.s3_endpoint_override
)

# TODO why None check necessary
Expand Down
Loading

0 comments on commit 771760f

Please sign in to comment.