Skip to content

Commit

Permalink
Better logging for materialize command (#1499)
Browse files Browse the repository at this point in the history
* Better logging for materialize command

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* Address comments

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* Move tqdm to FeatureStore class

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* Rebase

Signed-off-by: Jacob Klegar <jacob@tecton.ai>
  • Loading branch information
jklegar authored May 11, 2021
1 parent b482e21 commit d46e4bd
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 16 deletions.
69 changes: 55 additions & 14 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import pandas as pd
from colorama import Fore, Style
from tqdm import tqdm

from feast import utils
from feast.entity import Entity
Expand Down Expand Up @@ -341,6 +342,12 @@ def materialize_incremental(
feature_view = self._registry.get_feature_view(name, self.project)
feature_views_to_materialize.append(feature_view)

_print_materialization_log(
None,
end_date,
len(feature_views_to_materialize),
self.config.online_store.type,
)
# TODO paging large loads
for feature_view in feature_views_to_materialize:
start_date = feature_view.most_recent_end_time
Expand All @@ -352,11 +359,23 @@ def materialize_incremental(
)
start_date = datetime.utcnow() - feature_view.ttl
provider = self._get_provider()
_print_materialization_log(start_date, end_date, feature_view)
print(
f"{Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}"
f" from {Style.BRIGHT + Fore.GREEN}{start_date.replace(microsecond=0).astimezone()}{Style.RESET_ALL}"
f" to {Style.BRIGHT + Fore.GREEN}{end_date.replace(microsecond=0).astimezone()}{Style.RESET_ALL}:"
)

def tqdm_builder(length):
return tqdm(total=length, ncols=100)

provider.materialize_single_feature_view(
feature_view, start_date, end_date, self._registry, self.project
feature_view,
start_date,
end_date,
self._registry,
self.project,
tqdm_builder,
)
print(" done!")

def materialize(
self,
Expand Down Expand Up @@ -406,14 +425,28 @@ def materialize(
feature_view = self._registry.get_feature_view(name, self.project)
feature_views_to_materialize.append(feature_view)

_print_materialization_log(
start_date,
end_date,
len(feature_views_to_materialize),
self.config.online_store.type,
)
# TODO paging large loads
for feature_view in feature_views_to_materialize:
provider = self._get_provider()
_print_materialization_log(start_date, end_date, feature_view)
print(f"{Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}:")

def tqdm_builder(length):
return tqdm(total=length, ncols=100)

provider.materialize_single_feature_view(
feature_view, start_date, end_date, self._registry, self.project
feature_view,
start_date,
end_date,
self._registry,
self.project,
tqdm_builder,
)
print(" done!")

def get_online_features(
self, feature_refs: List[str], entity_rows: List[Dict[str, Any]],
Expand Down Expand Up @@ -600,11 +633,19 @@ def _get_table_entity_keys(
return entity_key_protos


def _print_materialization_log(start_date, end_date, feature_view):
print(
f"Materializing feature view {Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}"
f" from {Style.BRIGHT + Fore.GREEN}{start_date.astimezone()}{Style.RESET_ALL}"
f" to {Style.BRIGHT + Fore.GREEN}{end_date.astimezone()}{Style.RESET_ALL}",
end="",
flush=True,
)
def _print_materialization_log(
start_date, end_date, num_feature_views: int, online_store: str
):
if start_date:
print(
f"Materializing {Style.BRIGHT + Fore.GREEN}{num_feature_views}{Style.RESET_ALL} feature views"
f" from {Style.BRIGHT + Fore.GREEN}{start_date.replace(microsecond=0).astimezone()}{Style.RESET_ALL}"
f" to {Style.BRIGHT + Fore.GREEN}{end_date.replace(microsecond=0).astimezone()}{Style.RESET_ALL}"
f" into the {Style.BRIGHT + Fore.GREEN}{online_store}{Style.RESET_ALL} online store.\n"
)
else:
print(
f"Materializing {Style.BRIGHT + Fore.GREEN}{num_feature_views}{Style.RESET_ALL} feature views"
f" to {Style.BRIGHT + Fore.GREEN}{end_date.replace(microsecond=0).astimezone()}{Style.RESET_ALL}"
f" into the {Style.BRIGHT + Fore.GREEN}{online_store}{Style.RESET_ALL} online store.\n"
)
7 changes: 6 additions & 1 deletion sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pandas
import pyarrow
from google.auth.exceptions import DefaultCredentialsError
from tqdm import tqdm

from feast import FeatureTable, utils
from feast.entity import Entity
Expand Down Expand Up @@ -146,6 +147,7 @@ def materialize_single_feature_view(
end_date: datetime,
registry: Registry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
entities = []
for entity_name in feature_view.entities:
Expand Down Expand Up @@ -178,7 +180,10 @@ def materialize_single_feature_view(
join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

self.online_write_batch(project, feature_view, rows_to_write, None)
with tqdm_builder(len(rows_to_write)) as pbar:
self.online_write_batch(
project, feature_view, rows_to_write, lambda x: pbar.update(x)
)

feature_view.materialization_intervals.append((start_date, end_date))
registry.apply_feature_view(feature_view, project)
Expand Down
9 changes: 8 additions & 1 deletion sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pandas as pd
import pytz
from tqdm import tqdm

from feast import FeatureTable, utils
from feast.entity import Entity
Expand Down Expand Up @@ -123,6 +124,8 @@ def online_write_batch(
created_ts,
),
)
if progress:
progress(1)

def online_read(
self,
Expand Down Expand Up @@ -165,6 +168,7 @@ def materialize_single_feature_view(
end_date: datetime,
registry: Registry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
entities = []
for entity_name in feature_view.entities:
Expand Down Expand Up @@ -197,7 +201,10 @@ def materialize_single_feature_view(
join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

self.online_write_batch(project, feature_view, rows_to_write, None)
with tqdm_builder(len(rows_to_write)) as pbar:
self.online_write_batch(
project, feature_view, rows_to_write, lambda x: pbar.update(x)
)

feature_view.materialization_intervals.append((start_date, end_date))
registry.apply_feature_view(feature_view, project)
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pandas
import pyarrow
from tqdm import tqdm

from feast import errors
from feast.entity import Entity
Expand Down Expand Up @@ -102,6 +103,7 @@ def materialize_single_feature_view(
end_date: datetime,
registry: Registry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
pass

Expand Down
2 changes: 2 additions & 0 deletions sdk/python/tests/foo_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import pandas
from tqdm import tqdm

from feast import Entity, FeatureTable, FeatureView, RepoConfig
from feast.infra.offline_stores.offline_store import RetrievalJob
Expand Down Expand Up @@ -49,6 +50,7 @@ def materialize_single_feature_view(
end_date: datetime,
registry: Registry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
pass

Expand Down

0 comments on commit d46e4bd

Please sign in to comment.