diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 35d63777c5..d6f2da5b7b 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -20,6 +20,7 @@ import pandas as pd from colorama import Fore, Style +from tqdm import tqdm from feast import utils from feast.entity import Entity @@ -341,6 +342,12 @@ def materialize_incremental( feature_view = self._registry.get_feature_view(name, self.project) feature_views_to_materialize.append(feature_view) + _print_materialization_log( + None, + end_date, + len(feature_views_to_materialize), + self.config.online_store.type, + ) # TODO paging large loads for feature_view in feature_views_to_materialize: start_date = feature_view.most_recent_end_time @@ -352,11 +359,23 @@ def materialize_incremental( ) start_date = datetime.utcnow() - feature_view.ttl provider = self._get_provider() - _print_materialization_log(start_date, end_date, feature_view) + print( + f"{Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}" + f" from {Style.BRIGHT + Fore.GREEN}{start_date.replace(microsecond=0).astimezone()}{Style.RESET_ALL}" + f" to {Style.BRIGHT + Fore.GREEN}{end_date.replace(microsecond=0).astimezone()}{Style.RESET_ALL}:" + ) + + def tqdm_builder(length): + return tqdm(total=length, ncols=100) + provider.materialize_single_feature_view( - feature_view, start_date, end_date, self._registry, self.project + feature_view, + start_date, + end_date, + self._registry, + self.project, + tqdm_builder, ) - print(" done!") def materialize( self, @@ -406,14 +425,28 @@ def materialize( feature_view = self._registry.get_feature_view(name, self.project) feature_views_to_materialize.append(feature_view) + _print_materialization_log( + start_date, + end_date, + len(feature_views_to_materialize), + self.config.online_store.type, + ) # TODO paging large loads for feature_view in feature_views_to_materialize: provider = self._get_provider() - _print_materialization_log(start_date, end_date, feature_view) + print(f"{Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}:") + + def tqdm_builder(length): + return tqdm(total=length, ncols=100) + provider.materialize_single_feature_view( - feature_view, start_date, end_date, self._registry, self.project + feature_view, + start_date, + end_date, + self._registry, + self.project, + tqdm_builder, ) - print(" done!") def get_online_features( self, feature_refs: List[str], entity_rows: List[Dict[str, Any]], @@ -600,11 +633,19 @@ def _get_table_entity_keys( return entity_key_protos -def _print_materialization_log(start_date, end_date, feature_view): - print( - f"Materializing feature view {Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}" - f" from {Style.BRIGHT + Fore.GREEN}{start_date.astimezone()}{Style.RESET_ALL}" - f" to {Style.BRIGHT + Fore.GREEN}{end_date.astimezone()}{Style.RESET_ALL}", - end="", - flush=True, - ) +def _print_materialization_log( + start_date, end_date, num_feature_views: int, online_store: str +): + if start_date: + print( + f"Materializing {Style.BRIGHT + Fore.GREEN}{num_feature_views}{Style.RESET_ALL} feature views" + f" from {Style.BRIGHT + Fore.GREEN}{start_date.replace(microsecond=0).astimezone()}{Style.RESET_ALL}" + f" to {Style.BRIGHT + Fore.GREEN}{end_date.replace(microsecond=0).astimezone()}{Style.RESET_ALL}" + f" into the {Style.BRIGHT + Fore.GREEN}{online_store}{Style.RESET_ALL} online store.\n" + ) + else: + print( + f"Materializing {Style.BRIGHT + Fore.GREEN}{num_feature_views}{Style.RESET_ALL} feature views" + f" to {Style.BRIGHT + Fore.GREEN}{end_date.replace(microsecond=0).astimezone()}{Style.RESET_ALL}" + f" into the {Style.BRIGHT + Fore.GREEN}{online_store}{Style.RESET_ALL} online store.\n" + ) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index 1557c39ac5..b5f0ee8958 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -7,6 +7,7 @@ import pandas import pyarrow from google.auth.exceptions import DefaultCredentialsError +from tqdm import tqdm from feast import FeatureTable, utils from feast.entity import Entity @@ -146,6 +147,7 @@ def materialize_single_feature_view( end_date: datetime, registry: Registry, project: str, + tqdm_builder: Callable[[int], tqdm], ) -> None: entities = [] for entity_name in feature_view.entities: @@ -178,7 +180,10 @@ def materialize_single_feature_view( join_keys = [entity.join_key for entity in entities] rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) - self.online_write_batch(project, feature_view, rows_to_write, None) + with tqdm_builder(len(rows_to_write)) as pbar: + self.online_write_batch( + project, feature_view, rows_to_write, lambda x: pbar.update(x) + ) feature_view.materialization_intervals.append((start_date, end_date)) registry.apply_feature_view(feature_view, project) diff --git a/sdk/python/feast/infra/local.py b/sdk/python/feast/infra/local.py index 4ee064fab2..0d6b16c950 100644 --- a/sdk/python/feast/infra/local.py +++ b/sdk/python/feast/infra/local.py @@ -6,6 +6,7 @@ import pandas as pd import pytz +from tqdm import tqdm from feast import FeatureTable, utils from feast.entity import Entity @@ -123,6 +124,8 @@ def online_write_batch( created_ts, ), ) + if progress: + progress(1) def online_read( self, @@ -165,6 +168,7 @@ def materialize_single_feature_view( end_date: datetime, registry: Registry, project: str, + tqdm_builder: Callable[[int], tqdm], ) -> None: entities = [] for entity_name in feature_view.entities: @@ -197,7 +201,10 @@ def materialize_single_feature_view( join_keys = [entity.join_key for entity in entities] rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) - self.online_write_batch(project, feature_view, rows_to_write, None) + with tqdm_builder(len(rows_to_write)) as pbar: + self.online_write_batch( + project, feature_view, rows_to_write, lambda x: pbar.update(x) + ) feature_view.materialization_intervals.append((start_date, end_date)) registry.apply_feature_view(feature_view, project) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index fbb8bb021e..ed73bafffc 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -6,6 +6,7 @@ import pandas import pyarrow +from tqdm import tqdm from feast import errors from feast.entity import Entity @@ -102,6 +103,7 @@ def materialize_single_feature_view( end_date: datetime, registry: Registry, project: str, + tqdm_builder: Callable[[int], tqdm], ) -> None: pass diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index 658abbffd1..b8705eef99 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -2,6 +2,7 @@ from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union import pandas +from tqdm import tqdm from feast import Entity, FeatureTable, FeatureView, RepoConfig from feast.infra.offline_stores.offline_store import RetrievalJob @@ -49,6 +50,7 @@ def materialize_single_feature_view( end_date: datetime, registry: Registry, project: str, + tqdm_builder: Callable[[int], tqdm], ) -> None: pass