Skip to content

Commit

Permalink
Extend context for usage statistics collection & add latencies for pe…
Browse files Browse the repository at this point in the history
…rformance analysis (#1983)

* Extend context for usage statistics collection & add latencies for performance analysis

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* uncomment test marks

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* fix UTs

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* make lint happy

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* remove prow hook for usage tests

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* adding decorators through code

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* consistent naming

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* deterministic tests

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* linting

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* fix conflicts

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* fix redis span

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* minimize overhead of telemetry

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* all datetimes in utc

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* format

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* fix tests

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* consistent naming

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* consistent naming

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* add installation timestamp and environment signature to usage context

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* instantiate correct provider class

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* add timeout to exporter

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* generalize provider selection

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex authored Nov 5, 2021
1 parent 4d0b50f commit 963d3ac
Show file tree
Hide file tree
Showing 20 changed files with 815 additions and 424 deletions.
20 changes: 0 additions & 20 deletions .prow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,6 @@ presubmits:
branches:
- ^v0\.(3|4)-branch$

- name: test-usage
decorate: true
run_if_changed: "sdk/python/.*"
spec:
containers:
- image: python:3.7
command: ["infra/scripts/test-usage.sh"]
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /etc/gcloud/service-account.json
volumeMounts:
- mountPath: /etc/gcloud/service-account.json
name: service-account
readOnly: true
subPath: service-account.json
volumes:
- name: service-account
secret:
secretName: feast-service-account

- name: test-golang-sdk
decorate: true
spec:
Expand Down
18 changes: 7 additions & 11 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
from feast.repo_config import RepoConfig, load_repo_config
from feast.request_feature_view import RequestFeatureView
from feast.type_map import python_value_to_proto_value
from feast.usage import UsageEvent, log_event, log_exceptions, log_exceptions_and_usage
from feast.usage import log_exceptions, log_exceptions_and_usage, set_usage_attribute
from feast.value_type import ValueType
from feast.version import get_version

Expand Down Expand Up @@ -464,8 +464,7 @@ def apply(
):
raise ExperimentalFeatureNotEnabled(flags.FLAG_ON_DEMAND_TRANSFORM_NAME)

if len(odfvs_to_update) > 0:
log_event(UsageEvent.APPLY_WITH_ODFV)
set_usage_attribute("odfv", bool(odfvs_to_update))

_validate_feature_views(
[*views_to_update, *odfvs_to_update, *request_views_to_update]
Expand Down Expand Up @@ -678,10 +677,9 @@ def get_historical_features(
feature_views = list(view for view, _ in fvs)
on_demand_feature_views = list(view for view, _ in odfvs)
request_feature_views = list(view for view, _ in request_fvs)
if len(on_demand_feature_views) > 0:
log_event(UsageEvent.GET_HISTORICAL_FEATURES_WITH_ODFV)
if len(request_feature_views) > 0:
log_event(UsageEvent.GET_HISTORICAL_FEATURES_WITH_REQUEST_FV)

set_usage_attribute("odfv", bool(on_demand_feature_views))
set_usage_attribute("request_fv", bool(request_feature_views))

# Check that the right request data is present in the entity_df
if type(entity_df) == pd.DataFrame:
Expand Down Expand Up @@ -973,10 +971,8 @@ def get_online_features(
all_request_feature_views,
all_on_demand_feature_views,
)
if len(grouped_odfv_refs) > 0:
log_event(UsageEvent.GET_ONLINE_FEATURES_WITH_ODFV)
if len(grouped_request_fv_refs) > 0:
log_event(UsageEvent.GET_ONLINE_FEATURES_WITH_REQUEST_FV)
set_usage_attribute("odfv", bool(grouped_odfv_refs))
set_usage_attribute("request_fv", bool(grouped_request_fv_refs))

feature_views = list(view for view, _ in grouped_refs)
entityless_case = DUMMY_ENTITY_NAME in [
Expand Down
6 changes: 6 additions & 0 deletions sdk/python/feast/infra/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from feast.registry import get_registry_store_class_from_scheme
from feast.registry_store import RegistryStore
from feast.repo_config import RegistryConfig
from feast.usage import log_exceptions_and_usage
from feast.version import get_version

try:
Expand All @@ -50,6 +51,7 @@


class AwsProvider(PassthroughProvider):
@log_exceptions_and_usage(provider="AwsProvider")
def update_infra(
self,
project: str,
Expand Down Expand Up @@ -188,6 +190,7 @@ def _deploy_feature_server(self, project: str, image_uri: str):
SourceArn=f"arn:aws:execute-api:{region}:{account_id}:{api_id}/*/*/get-online-features",
)

@log_exceptions_and_usage(provider="AwsProvider")
def teardown_infra(
self,
project: str,
Expand Down Expand Up @@ -216,6 +219,7 @@ def teardown_infra(
_logger.info(" Tearing down AWS API Gateway...")
aws_utils.delete_api_gateway(api_gateway_client, api["ApiId"])

@log_exceptions_and_usage(provider="AwsProvider")
def get_feature_server_endpoint(self) -> Optional[str]:
project = self.repo_config.project
resource_name = _get_lambda_name(project)
Expand Down Expand Up @@ -334,6 +338,7 @@ def __init__(self, registry_config: RegistryConfig, repo_path: Path):
"s3", endpoint_url=os.environ.get("FEAST_S3_ENDPOINT_URL")
)

@log_exceptions_and_usage(registry="s3")
def get_registry_proto(self):
file_obj = TemporaryFile()
registry_proto = RegistryProto()
Expand Down Expand Up @@ -366,6 +371,7 @@ def get_registry_proto(self):
f"Error while trying to locate Registry at path {self._uri.geturl()}"
) from e

@log_exceptions_and_usage(registry="s3")
def update_registry_proto(self, registry_proto: RegistryProto):
self._write_registry(registry_proto)

Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.registry_store import RegistryStore
from feast.repo_config import RegistryConfig
from feast.usage import log_exceptions_and_usage


class GcpProvider(PassthroughProvider):
Expand All @@ -33,6 +34,7 @@ def __init__(self, registry_config: RegistryConfig, repo_path: Path):
self._bucket = self._uri.hostname
self._blob = self._uri.path.lstrip("/")

@log_exceptions_and_usage(registry="gs")
def get_registry_proto(self):
import google.cloud.storage as storage
from google.cloud.exceptions import NotFound
Expand All @@ -56,6 +58,7 @@ def get_registry_proto(self):
f'Registry not found at path "{self._uri.geturl()}". Have you run "feast apply"?'
)

@log_exceptions_and_usage(registry="gs")
def update_registry_proto(self, registry_proto: RegistryProto):
self._write_registry(registry_proto)

Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.registry_store import RegistryStore
from feast.repo_config import RegistryConfig
from feast.usage import log_exceptions_and_usage


class LocalProvider(PassthroughProvider):
Expand Down Expand Up @@ -40,6 +41,7 @@ def __init__(self, registry_config: RegistryConfig, repo_path: Path):
else:
self._filepath = repo_path.joinpath(registry_path)

@log_exceptions_and_usage(registry="local")
def get_registry_proto(self):
registry_proto = RegistryProto()
if self._filepath.exists():
Expand All @@ -49,6 +51,7 @@ def get_registry_proto(self):
f'Registry not found at path "{self._filepath}". Have you run "feast apply"?'
)

@log_exceptions_and_usage(registry="local")
def update_registry_proto(self, registry_proto: RegistryProto):
self._write_registry(registry_proto)

Expand Down
34 changes: 21 additions & 13 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from feast.registry import Registry
from feast.repo_config import FeastConfigBaseModel, RepoConfig

from ...usage import log_exceptions_and_usage
from .bigquery_source import BigQuerySource

try:
Expand Down Expand Up @@ -62,6 +63,7 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel):

class BigQueryOfflineStore(OfflineStore):
@staticmethod
@log_exceptions_and_usage(offline_store="bigquery")
def pull_latest_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
Expand Down Expand Up @@ -113,6 +115,7 @@ def pull_latest_from_table_or_query(
)

@staticmethod
@log_exceptions_and_usage(offline_store="bigquery")
def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
Expand Down Expand Up @@ -221,7 +224,7 @@ def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:

def _to_df_internal(self) -> pd.DataFrame:
with self._query_generator() as query:
df = self.client.query(query).to_dataframe(create_bqstorage_client=True)
df = self._execute_query(query).to_dataframe(create_bqstorage_client=True)
return df

def to_sql(self) -> str:
Expand Down Expand Up @@ -265,24 +268,29 @@ def to_bigquery(
return str(job_config.destination)

with self._query_generator() as query:
bq_job = self.client.query(query, job_config=job_config)

if job_config.dry_run:
print(
"This query will process {} bytes.".format(
bq_job.total_bytes_processed
)
)
return None

block_until_done(client=self.client, bq_job=bq_job, timeout=timeout)
self._execute_query(query, job_config, timeout)

print(f"Done writing to '{job_config.destination}'.")
return str(job_config.destination)

def _to_arrow_internal(self) -> pyarrow.Table:
with self._query_generator() as query:
return self.client.query(query).to_arrow()
return self._execute_query(query).to_arrow()

@log_exceptions_and_usage
def _execute_query(
self, query, job_config=None, timeout: int = 1800
) -> bigquery.job.query.QueryJob:
bq_job = self.client.query(query, job_config=job_config)

if job_config and job_config.dry_run:
print(
"This query will process {} bytes.".format(bq_job.total_bytes_processed)
)
return None

block_until_done(client=self.client, bq_job=bq_job, timeout=timeout)
return bq_job


def block_until_done(
Expand Down
5 changes: 5 additions & 0 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from feast.registry import Registry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.usage import log_exceptions_and_usage


class FileOfflineStoreConfig(FeastConfigBaseModel):
Expand Down Expand Up @@ -51,11 +52,13 @@ def full_feature_names(self) -> bool:
def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:
return self._on_demand_feature_views

@log_exceptions_and_usage
def _to_df_internal(self) -> pd.DataFrame:
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
df = self.evaluation_function()
return df

@log_exceptions_and_usage
def _to_arrow_internal(self):
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
df = self.evaluation_function()
Expand All @@ -64,6 +67,7 @@ def _to_arrow_internal(self):

class FileOfflineStore(OfflineStore):
@staticmethod
@log_exceptions_and_usage(offline_store="file")
def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
Expand Down Expand Up @@ -264,6 +268,7 @@ def evaluate_historical_retrieval():
return job

@staticmethod
@log_exceptions_and_usage(offline_store="file")
def pull_latest_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from feast.infra.utils import aws_utils
from feast.registry import Registry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.usage import log_exceptions_and_usage


class RedshiftOfflineStoreConfig(FeastConfigBaseModel):
Expand Down Expand Up @@ -47,6 +48,7 @@ class RedshiftOfflineStoreConfig(FeastConfigBaseModel):

class RedshiftOfflineStore(OfflineStore):
@staticmethod
@log_exceptions_and_usage(offline_store="redshift")
def pull_latest_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
Expand Down Expand Up @@ -103,6 +105,7 @@ def pull_latest_from_table_or_query(
)

@staticmethod
@log_exceptions_and_usage(offline_store="redshift")
def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
Expand Down Expand Up @@ -227,6 +230,7 @@ def full_feature_names(self) -> bool:
def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:
return self._on_demand_feature_views

@log_exceptions_and_usage
def _to_df_internal(self) -> pd.DataFrame:
with self._query_generator() as query:
return aws_utils.unload_redshift_query_to_df(
Expand All @@ -240,6 +244,7 @@ def _to_df_internal(self) -> pd.DataFrame:
query,
)

@log_exceptions_and_usage
def _to_arrow_internal(self) -> pa.Table:
with self._query_generator() as query:
return aws_utils.unload_redshift_query_to_pa(
Expand All @@ -253,6 +258,7 @@ def _to_arrow_internal(self) -> pa.Table:
query,
)

@log_exceptions_and_usage
def to_s3(self) -> str:
""" Export dataset to S3 in Parquet format and return path """
if self.on_demand_feature_views:
Expand All @@ -272,6 +278,7 @@ def to_s3(self) -> str:
)
return self._s3_path

@log_exceptions_and_usage
def to_redshift(self, table_name: str) -> None:
""" Save dataset as a new Redshift table """
if self.on_demand_feature_views:
Expand Down
7 changes: 6 additions & 1 deletion sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.usage import log_exceptions_and_usage, tracing_span

try:
from google.auth.exceptions import DefaultCredentialsError
Expand Down Expand Up @@ -69,6 +70,7 @@ class DatastoreOnlineStore(OnlineStore):

_client: Optional[datastore.Client] = None

@log_exceptions_and_usage(online_store="datastore")
def update(
self,
config: RepoConfig,
Expand Down Expand Up @@ -140,6 +142,7 @@ def _get_client(self, online_config: DatastoreOnlineStoreConfig):
)
return self._client

@log_exceptions_and_usage(online_store="datastore")
def online_write_batch(
self,
config: RepoConfig,
Expand Down Expand Up @@ -220,6 +223,7 @@ def _write_minibatch(
if progress:
progress(len(entities))

@log_exceptions_and_usage(online_store="datastore")
def online_read(
self,
config: RepoConfig,
Expand All @@ -245,7 +249,8 @@ def online_read(

# NOTE: get_multi doesn't return values in the same order as the keys in the request.
# Also, len(values) can be less than len(keys) in the case of missing values.
values = client.get_multi(keys)
with tracing_span(name="remote_call"):
values = client.get_multi(keys)
values_dict = {v.key: v for v in values} if values is not None else {}
for key in keys:
if key in values_dict:
Expand Down
Loading

0 comments on commit 963d3ac

Please sign in to comment.