Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better logging for materialize command #1499

Merged
merged 4 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 55 additions & 14 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import pandas as pd
from colorama import Fore, Style
from tqdm import tqdm

from feast import utils
from feast.entity import Entity
Expand Down Expand Up @@ -341,6 +342,12 @@ def materialize_incremental(
feature_view = self._registry.get_feature_view(name, self.project)
feature_views_to_materialize.append(feature_view)

_print_materialization_log(
None,
end_date,
len(feature_views_to_materialize),
self.config.online_store.type,
)
# TODO paging large loads
for feature_view in feature_views_to_materialize:
start_date = feature_view.most_recent_end_time
Expand All @@ -352,11 +359,23 @@ def materialize_incremental(
)
start_date = datetime.utcnow() - feature_view.ttl
provider = self._get_provider()
_print_materialization_log(start_date, end_date, feature_view)
print(
f"{Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}"
f" from {Style.BRIGHT + Fore.GREEN}{start_date.replace(microsecond=0).astimezone()}{Style.RESET_ALL}"
f" to {Style.BRIGHT + Fore.GREEN}{end_date.replace(microsecond=0).astimezone()}{Style.RESET_ALL}:"
)

def tqdm_builder(length):
return tqdm(total=length, ncols=100)

provider.materialize_single_feature_view(
feature_view, start_date, end_date, self._registry, self.project
feature_view,
start_date,
end_date,
self._registry,
self.project,
tqdm_builder,
)
print(" done!")

def materialize(
self,
Expand Down Expand Up @@ -406,14 +425,28 @@ def materialize(
feature_view = self._registry.get_feature_view(name, self.project)
feature_views_to_materialize.append(feature_view)

_print_materialization_log(
start_date,
end_date,
len(feature_views_to_materialize),
self.config.online_store.type,
)
# TODO paging large loads
for feature_view in feature_views_to_materialize:
provider = self._get_provider()
_print_materialization_log(start_date, end_date, feature_view)
print(f"{Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}:")

def tqdm_builder(length):
return tqdm(total=length, ncols=100)

provider.materialize_single_feature_view(
feature_view, start_date, end_date, self._registry, self.project
feature_view,
start_date,
end_date,
self._registry,
self.project,
tqdm_builder,
)
print(" done!")

def get_online_features(
self, feature_refs: List[str], entity_rows: List[Dict[str, Any]],
Expand Down Expand Up @@ -600,11 +633,19 @@ def _get_table_entity_keys(
return entity_key_protos


def _print_materialization_log(start_date, end_date, feature_view):
print(
f"Materializing feature view {Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}"
f" from {Style.BRIGHT + Fore.GREEN}{start_date.astimezone()}{Style.RESET_ALL}"
f" to {Style.BRIGHT + Fore.GREEN}{end_date.astimezone()}{Style.RESET_ALL}",
end="",
flush=True,
)
def _print_materialization_log(
start_date, end_date, num_feature_views: int, online_store: str
):
if start_date:
print(
f"Materializing {Style.BRIGHT + Fore.GREEN}{num_feature_views}{Style.RESET_ALL} feature views"
f" from {Style.BRIGHT + Fore.GREEN}{start_date.replace(microsecond=0).astimezone()}{Style.RESET_ALL}"
f" to {Style.BRIGHT + Fore.GREEN}{end_date.replace(microsecond=0).astimezone()}{Style.RESET_ALL}"
f" into the {Style.BRIGHT + Fore.GREEN}{online_store}{Style.RESET_ALL} online store.\n"
)
else:
print(
f"Materializing {Style.BRIGHT + Fore.GREEN}{num_feature_views}{Style.RESET_ALL} feature views"
f" to {Style.BRIGHT + Fore.GREEN}{end_date.replace(microsecond=0).astimezone()}{Style.RESET_ALL}"
f" into the {Style.BRIGHT + Fore.GREEN}{online_store}{Style.RESET_ALL} online store.\n"
)
7 changes: 6 additions & 1 deletion sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pandas
import pyarrow
from google.auth.exceptions import DefaultCredentialsError
from tqdm import tqdm

from feast import FeatureTable, utils
from feast.entity import Entity
Expand Down Expand Up @@ -146,6 +147,7 @@ def materialize_single_feature_view(
end_date: datetime,
registry: Registry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
entities = []
for entity_name in feature_view.entities:
Expand Down Expand Up @@ -178,7 +180,10 @@ def materialize_single_feature_view(
join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

self.online_write_batch(project, feature_view, rows_to_write, None)
with tqdm_builder(len(rows_to_write)) as pbar:
self.online_write_batch(
project, feature_view, rows_to_write, lambda x: pbar.update(x)
)

feature_view.materialization_intervals.append((start_date, end_date))
registry.apply_feature_view(feature_view, project)
Expand Down
9 changes: 8 additions & 1 deletion sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pandas as pd
import pytz
from tqdm import tqdm

from feast import FeatureTable, utils
from feast.entity import Entity
Expand Down Expand Up @@ -123,6 +124,8 @@ def online_write_batch(
created_ts,
),
)
if progress:
progress(1)

def online_read(
self,
Expand Down Expand Up @@ -165,6 +168,7 @@ def materialize_single_feature_view(
end_date: datetime,
registry: Registry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
entities = []
for entity_name in feature_view.entities:
Expand Down Expand Up @@ -197,7 +201,10 @@ def materialize_single_feature_view(
join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

self.online_write_batch(project, feature_view, rows_to_write, None)
with tqdm_builder(len(rows_to_write)) as pbar:
self.online_write_batch(
project, feature_view, rows_to_write, lambda x: pbar.update(x)
)

feature_view.materialization_intervals.append((start_date, end_date))
registry.apply_feature_view(feature_view, project)
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pandas
import pyarrow
from tqdm import tqdm

from feast import errors
from feast.entity import Entity
Expand Down Expand Up @@ -102,6 +103,7 @@ def materialize_single_feature_view(
end_date: datetime,
registry: Registry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
pass

Expand Down
2 changes: 2 additions & 0 deletions sdk/python/tests/foo_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import pandas
from tqdm import tqdm

from feast import Entity, FeatureTable, FeatureView, RepoConfig
from feast.infra.offline_stores.offline_store import RetrievalJob
Expand Down Expand Up @@ -49,6 +50,7 @@ def materialize_single_feature_view(
end_date: datetime,
registry: Registry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
pass

Expand Down