Skip to content

Commit

Permalink
Don't lose materialization interval tracking when re-applying feature…
Browse files Browse the repository at this point in the history
… views (#1559)

* Don't lose materialization interval tracking when re-applying feature views

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* lint

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* Add a test

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* lint

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* Use better error + fix other errors

Signed-off-by: Jacob Klegar <jacob@tecton.ai>
  • Loading branch information
jklegar authored Jun 7, 2021
1 parent 7877828 commit 25daab3
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 20 deletions.
14 changes: 14 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ def materialize_incremental(
def tqdm_builder(length):
return tqdm(total=length, ncols=100)

start_date = utils.make_tzaware(start_date)
end_date = utils.make_tzaware(end_date)

provider.materialize_single_feature_view(
feature_view,
start_date,
Expand All @@ -380,6 +383,10 @@ def tqdm_builder(length):
tqdm_builder,
)

self._registry.apply_materialization(
feature_view, self.project, start_date, end_date
)

@log_exceptions_and_usage
def materialize(
self,
Expand Down Expand Up @@ -442,6 +449,9 @@ def materialize(
def tqdm_builder(length):
return tqdm(total=length, ncols=100)

start_date = utils.make_tzaware(start_date)
end_date = utils.make_tzaware(end_date)

provider.materialize_single_feature_view(
feature_view,
start_date,
Expand All @@ -451,6 +461,10 @@ def tqdm_builder(length):
tqdm_builder,
)

self._registry.apply_materialization(
feature_view, self.project, start_date, end_date
)

@log_exceptions_and_usage
def get_online_features(
self, feature_refs: List[str], entity_rows: List[Dict[str, Any]],
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def __init__(
self.name = name
self.entities = entities
self.features = features
self.tags = tags
self.tags = tags if tags is not None else {}

if isinstance(ttl, Duration):
self.ttl = timedelta(seconds=int(ttl.seconds))
Expand Down
6 changes: 0 additions & 6 deletions sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,6 @@ def materialize_single_feature_view(
created_timestamp_column,
) = _get_column_names(feature_view, entities)

start_date = utils.make_tzaware(start_date)
end_date = utils.make_tzaware(end_date)

table = self.offline_store.pull_latest_from_table_or_query(
data_source=feature_view.input,
join_key_columns=join_key_columns,
Expand All @@ -193,9 +190,6 @@ def materialize_single_feature_view(
project, feature_view, rows_to_write, lambda x: pbar.update(x)
)

feature_view.materialization_intervals.append((start_date, end_date))
registry.apply_feature_view(feature_view, project)

def get_historical_features(
self,
config: RepoConfig,
Expand Down
8 changes: 1 addition & 7 deletions sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pytz
from tqdm import tqdm

from feast import FeatureTable, utils
from feast import FeatureTable
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
Expand Down Expand Up @@ -180,9 +180,6 @@ def materialize_single_feature_view(
created_timestamp_column,
) = _get_column_names(feature_view, entities)

start_date = utils.make_tzaware(start_date)
end_date = utils.make_tzaware(end_date)

table = self.offline_store.pull_latest_from_table_or_query(
data_source=feature_view.input,
join_key_columns=join_key_columns,
Expand All @@ -204,9 +201,6 @@ def materialize_single_feature_view(
project, feature_view, rows_to_write, lambda x: pbar.update(x)
)

feature_view.materialization_intervals.append((start_date, end_date))
registry.apply_feature_view(feature_view, project)

def get_historical_features(
self,
config: RepoConfig,
Expand Down
55 changes: 51 additions & 4 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,58 @@ def updater(registry_proto: RegistryProto):
== feature_view_proto.spec.name
and existing_feature_view_proto.spec.project == project
):
# do not update if feature view has not changed; updating will erase tracked materialization intervals
if (
FeatureView.from_proto(existing_feature_view_proto)
== feature_view
):
return registry_proto
else:
del registry_proto.feature_views[idx]
registry_proto.feature_views.append(feature_view_proto)
return registry_proto
registry_proto.feature_views.append(feature_view_proto)
return registry_proto

self._registry_store.update_registry_proto(updater)

def apply_materialization(
self,
feature_view: FeatureView,
project: str,
start_date: datetime,
end_date: datetime,
):
"""
Updates materialization intervals tracked for a single feature view in Feast
Args:
feature_view: Feature view that will be updated with an additional materialization interval tracked
project: Feast project that this feature view belongs to
start_date (datetime): Start date of the materialization interval to track
end_date (datetime): End date of the materialization interval to track
"""

def updater(registry_proto: RegistryProto):
for idx, existing_feature_view_proto in enumerate(
registry_proto.feature_views
):
if (
existing_feature_view_proto.spec.name == feature_view.name
and existing_feature_view_proto.spec.project == project
):
existing_feature_view = FeatureView.from_proto(
existing_feature_view_proto
)
existing_feature_view.materialization_intervals.append(
(start_date, end_date)
)
feature_view_proto = existing_feature_view.to_proto()
feature_view_proto.spec.project = project
del registry_proto.feature_views[idx]
registry_proto.feature_views.append(feature_view_proto)
return registry_proto
registry_proto.feature_views.append(feature_view_proto)
return registry_proto
raise FeatureViewNotFoundException(feature_view.name, project)

self._registry_store.update_registry_proto(updater)

Expand Down Expand Up @@ -249,7 +296,7 @@ def get_feature_table(self, name: str, project: str) -> FeatureTable:
and feature_table_proto.spec.project == project
):
return FeatureTable.from_proto(feature_table_proto)
raise FeatureTableNotFoundException(project, name)
raise FeatureTableNotFoundException(name, project)

def get_feature_view(self, name: str, project: str) -> FeatureView:
"""
Expand Down Expand Up @@ -291,7 +338,7 @@ def updater(registry_proto: RegistryProto):
):
del registry_proto.feature_tables[idx]
return registry_proto
raise FeatureTableNotFoundException(project, name)
raise FeatureTableNotFoundException(name, project)

self._registry_store.update_registry_proto(updater)
return
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pytz import utc


def make_tzaware(t: datetime):
def make_tzaware(t: datetime) -> datetime:
""" We assume tz-naive datetimes are UTC """
if t.tzinfo is None:
return t.replace(tzinfo=utc)
Expand Down
58 changes: 57 additions & 1 deletion sdk/python/tests/test_feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from datetime import timedelta
from datetime import datetime, timedelta
from tempfile import mkstemp

import pytest
Expand Down Expand Up @@ -386,3 +386,59 @@ def test_apply_remote_repo():
online_store=SqliteOnlineStoreConfig(path=online_store_path),
)
)


@pytest.mark.parametrize(
"test_feature_store", [lazy_fixture("feature_store_with_local_registry")],
)
@pytest.mark.parametrize("dataframe_source", [lazy_fixture("simple_dataset_1")])
def test_reapply_feature_view_success(test_feature_store, dataframe_source):
with prep_file_source(
df=dataframe_source, event_timestamp_column="ts_1"
) as file_source:

e = Entity(name="id", value_type=ValueType.STRING)

# Create Feature View
fv1 = FeatureView(
name="my_feature_view_1",
features=[Feature(name="string_col", dtype=ValueType.STRING)],
entities=["id"],
input=file_source,
ttl=timedelta(minutes=5),
)

# Register Feature View
test_feature_store.apply([fv1, e])

# Check Feature View
fv_stored = test_feature_store.get_feature_view(fv1.name)
assert len(fv_stored.materialization_intervals) == 0

# Run materialization
test_feature_store.materialize(datetime(2020, 1, 1), datetime(2021, 1, 1))

# Check Feature View
fv_stored = test_feature_store.get_feature_view(fv1.name)
assert len(fv_stored.materialization_intervals) == 1

# Apply again
test_feature_store.apply([fv1])

# Check Feature View
fv_stored = test_feature_store.get_feature_view(fv1.name)
assert len(fv_stored.materialization_intervals) == 1

# Change and apply Feature View
fv1 = FeatureView(
name="my_feature_view_1",
features=[Feature(name="int64_col", dtype=ValueType.INT64)],
entities=["id"],
input=file_source,
ttl=timedelta(minutes=5),
)
test_feature_store.apply([fv1])

# Check Feature View
fv_stored = test_feature_store.get_feature_view(fv1.name)
assert len(fv_stored.materialization_intervals) == 0

0 comments on commit 25daab3

Please sign in to comment.