diff --git a/.prow.yaml b/.prow.yaml index e614e4a2f9..f541f81e0e 100644 --- a/.prow.yaml +++ b/.prow.yaml @@ -64,26 +64,6 @@ presubmits: branches: - ^v0\.(3|4)-branch$ -- name: test-usage - decorate: true - run_if_changed: "sdk/python/.*" - spec: - containers: - - image: python:3.7 - command: ["infra/scripts/test-usage.sh"] - env: - - name: GOOGLE_APPLICATION_CREDENTIALS - value: /etc/gcloud/service-account.json - volumeMounts: - - mountPath: /etc/gcloud/service-account.json - name: service-account - readOnly: true - subPath: service-account.json - volumes: - - name: service-account - secret: - secretName: feast-service-account - - name: test-golang-sdk decorate: true spec: diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index f2d4ba1791..bfc93e7f53 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -59,7 +59,7 @@ from feast.repo_config import RepoConfig, load_repo_config from feast.request_feature_view import RequestFeatureView from feast.type_map import python_value_to_proto_value -from feast.usage import UsageEvent, log_event, log_exceptions, log_exceptions_and_usage +from feast.usage import log_exceptions, log_exceptions_and_usage, set_usage_attribute from feast.value_type import ValueType from feast.version import get_version @@ -464,8 +464,7 @@ def apply( ): raise ExperimentalFeatureNotEnabled(flags.FLAG_ON_DEMAND_TRANSFORM_NAME) - if len(odfvs_to_update) > 0: - log_event(UsageEvent.APPLY_WITH_ODFV) + set_usage_attribute("odfv", bool(odfvs_to_update)) _validate_feature_views( [*views_to_update, *odfvs_to_update, *request_views_to_update] @@ -678,10 +677,9 @@ def get_historical_features( feature_views = list(view for view, _ in fvs) on_demand_feature_views = list(view for view, _ in odfvs) request_feature_views = list(view for view, _ in request_fvs) - if len(on_demand_feature_views) > 0: - log_event(UsageEvent.GET_HISTORICAL_FEATURES_WITH_ODFV) - if len(request_feature_views) > 0: - log_event(UsageEvent.GET_HISTORICAL_FEATURES_WITH_REQUEST_FV) + + set_usage_attribute("odfv", bool(on_demand_feature_views)) + set_usage_attribute("request_fv", bool(request_feature_views)) # Check that the right request data is present in the entity_df if type(entity_df) == pd.DataFrame: @@ -973,10 +971,8 @@ def get_online_features( all_request_feature_views, all_on_demand_feature_views, ) - if len(grouped_odfv_refs) > 0: - log_event(UsageEvent.GET_ONLINE_FEATURES_WITH_ODFV) - if len(grouped_request_fv_refs) > 0: - log_event(UsageEvent.GET_ONLINE_FEATURES_WITH_REQUEST_FV) + set_usage_attribute("odfv", bool(grouped_odfv_refs)) + set_usage_attribute("request_fv", bool(grouped_request_fv_refs)) feature_views = list(view for view, _ in grouped_refs) entityless_case = DUMMY_ENTITY_NAME in [ diff --git a/sdk/python/feast/infra/aws.py b/sdk/python/feast/infra/aws.py index 33aee669cd..f0348d2901 100644 --- a/sdk/python/feast/infra/aws.py +++ b/sdk/python/feast/infra/aws.py @@ -37,6 +37,7 @@ from feast.registry import get_registry_store_class_from_scheme from feast.registry_store import RegistryStore from feast.repo_config import RegistryConfig +from feast.usage import log_exceptions_and_usage from feast.version import get_version try: @@ -50,6 +51,7 @@ class AwsProvider(PassthroughProvider): + @log_exceptions_and_usage(provider="AwsProvider") def update_infra( self, project: str, @@ -188,6 +190,7 @@ def _deploy_feature_server(self, project: str, image_uri: str): SourceArn=f"arn:aws:execute-api:{region}:{account_id}:{api_id}/*/*/get-online-features", ) + @log_exceptions_and_usage(provider="AwsProvider") def teardown_infra( self, project: str, @@ -216,6 +219,7 @@ def teardown_infra( _logger.info(" Tearing down AWS API Gateway...") aws_utils.delete_api_gateway(api_gateway_client, api["ApiId"]) + @log_exceptions_and_usage(provider="AwsProvider") def get_feature_server_endpoint(self) -> Optional[str]: project = self.repo_config.project resource_name = _get_lambda_name(project) @@ -334,6 +338,7 @@ def __init__(self, registry_config: RegistryConfig, repo_path: Path): "s3", endpoint_url=os.environ.get("FEAST_S3_ENDPOINT_URL") ) + @log_exceptions_and_usage(registry="s3") def get_registry_proto(self): file_obj = TemporaryFile() registry_proto = RegistryProto() @@ -366,6 +371,7 @@ def get_registry_proto(self): f"Error while trying to locate Registry at path {self._uri.geturl()}" ) from e + @log_exceptions_and_usage(registry="s3") def update_registry_proto(self, registry_proto: RegistryProto): self._write_registry(registry_proto) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index 463a39af5a..257ae38d02 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -8,6 +8,7 @@ from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.registry_store import RegistryStore from feast.repo_config import RegistryConfig +from feast.usage import log_exceptions_and_usage class GcpProvider(PassthroughProvider): @@ -33,6 +34,7 @@ def __init__(self, registry_config: RegistryConfig, repo_path: Path): self._bucket = self._uri.hostname self._blob = self._uri.path.lstrip("/") + @log_exceptions_and_usage(registry="gs") def get_registry_proto(self): import google.cloud.storage as storage from google.cloud.exceptions import NotFound @@ -56,6 +58,7 @@ def get_registry_proto(self): f'Registry not found at path "{self._uri.geturl()}". Have you run "feast apply"?' ) + @log_exceptions_and_usage(registry="gs") def update_registry_proto(self, registry_proto: RegistryProto): self._write_registry(registry_proto) diff --git a/sdk/python/feast/infra/local.py b/sdk/python/feast/infra/local.py index 6ac3ce7259..d1dbd259ef 100644 --- a/sdk/python/feast/infra/local.py +++ b/sdk/python/feast/infra/local.py @@ -11,6 +11,7 @@ from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.registry_store import RegistryStore from feast.repo_config import RegistryConfig +from feast.usage import log_exceptions_and_usage class LocalProvider(PassthroughProvider): @@ -40,6 +41,7 @@ def __init__(self, registry_config: RegistryConfig, repo_path: Path): else: self._filepath = repo_path.joinpath(registry_path) + @log_exceptions_and_usage(registry="local") def get_registry_proto(self): registry_proto = RegistryProto() if self._filepath.exists(): @@ -49,6 +51,7 @@ def get_registry_proto(self): f'Registry not found at path "{self._filepath}". Have you run "feast apply"?' ) + @log_exceptions_and_usage(registry="local") def update_registry_proto(self, registry_proto: RegistryProto): self._write_registry(registry_proto) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 0a8c61a02b..15c077ecbf 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -26,6 +26,7 @@ from feast.registry import Registry from feast.repo_config import FeastConfigBaseModel, RepoConfig +from ...usage import log_exceptions_and_usage from .bigquery_source import BigQuerySource try: @@ -62,6 +63,7 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel): class BigQueryOfflineStore(OfflineStore): @staticmethod + @log_exceptions_and_usage(offline_store="bigquery") def pull_latest_from_table_or_query( config: RepoConfig, data_source: DataSource, @@ -113,6 +115,7 @@ def pull_latest_from_table_or_query( ) @staticmethod + @log_exceptions_and_usage(offline_store="bigquery") def get_historical_features( config: RepoConfig, feature_views: List[FeatureView], @@ -221,7 +224,7 @@ def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]: def _to_df_internal(self) -> pd.DataFrame: with self._query_generator() as query: - df = self.client.query(query).to_dataframe(create_bqstorage_client=True) + df = self._execute_query(query).to_dataframe(create_bqstorage_client=True) return df def to_sql(self) -> str: @@ -265,24 +268,29 @@ def to_bigquery( return str(job_config.destination) with self._query_generator() as query: - bq_job = self.client.query(query, job_config=job_config) - - if job_config.dry_run: - print( - "This query will process {} bytes.".format( - bq_job.total_bytes_processed - ) - ) - return None - - block_until_done(client=self.client, bq_job=bq_job, timeout=timeout) + self._execute_query(query, job_config, timeout) print(f"Done writing to '{job_config.destination}'.") return str(job_config.destination) def _to_arrow_internal(self) -> pyarrow.Table: with self._query_generator() as query: - return self.client.query(query).to_arrow() + return self._execute_query(query).to_arrow() + + @log_exceptions_and_usage + def _execute_query( + self, query, job_config=None, timeout: int = 1800 + ) -> bigquery.job.query.QueryJob: + bq_job = self.client.query(query, job_config=job_config) + + if job_config and job_config.dry_run: + print( + "This query will process {} bytes.".format(bq_job.total_bytes_processed) + ) + return None + + block_until_done(client=self.client, bq_job=bq_job, timeout=timeout) + return bq_job def block_until_done( diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index a38d175b11..bce32e92b3 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -20,6 +20,7 @@ ) from feast.registry import Registry from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.usage import log_exceptions_and_usage class FileOfflineStoreConfig(FeastConfigBaseModel): @@ -51,11 +52,13 @@ def full_feature_names(self) -> bool: def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]: return self._on_demand_feature_views + @log_exceptions_and_usage def _to_df_internal(self) -> pd.DataFrame: # Only execute the evaluation function to build the final historical retrieval dataframe at the last moment. df = self.evaluation_function() return df + @log_exceptions_and_usage def _to_arrow_internal(self): # Only execute the evaluation function to build the final historical retrieval dataframe at the last moment. df = self.evaluation_function() @@ -64,6 +67,7 @@ def _to_arrow_internal(self): class FileOfflineStore(OfflineStore): @staticmethod + @log_exceptions_and_usage(offline_store="file") def get_historical_features( config: RepoConfig, feature_views: List[FeatureView], @@ -264,6 +268,7 @@ def evaluate_historical_retrieval(): return job @staticmethod + @log_exceptions_and_usage(offline_store="file") def pull_latest_from_table_or_query( config: RepoConfig, data_source: DataSource, diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index 8c14a7d237..e8a7c985d4 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -18,6 +18,7 @@ from feast.infra.utils import aws_utils from feast.registry import Registry from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.usage import log_exceptions_and_usage class RedshiftOfflineStoreConfig(FeastConfigBaseModel): @@ -47,6 +48,7 @@ class RedshiftOfflineStoreConfig(FeastConfigBaseModel): class RedshiftOfflineStore(OfflineStore): @staticmethod + @log_exceptions_and_usage(offline_store="redshift") def pull_latest_from_table_or_query( config: RepoConfig, data_source: DataSource, @@ -103,6 +105,7 @@ def pull_latest_from_table_or_query( ) @staticmethod + @log_exceptions_and_usage(offline_store="redshift") def get_historical_features( config: RepoConfig, feature_views: List[FeatureView], @@ -227,6 +230,7 @@ def full_feature_names(self) -> bool: def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]: return self._on_demand_feature_views + @log_exceptions_and_usage def _to_df_internal(self) -> pd.DataFrame: with self._query_generator() as query: return aws_utils.unload_redshift_query_to_df( @@ -240,6 +244,7 @@ def _to_df_internal(self) -> pd.DataFrame: query, ) + @log_exceptions_and_usage def _to_arrow_internal(self) -> pa.Table: with self._query_generator() as query: return aws_utils.unload_redshift_query_to_pa( @@ -253,6 +258,7 @@ def _to_arrow_internal(self) -> pa.Table: query, ) + @log_exceptions_and_usage def to_s3(self) -> str: """ Export dataset to S3 in Parquet format and return path """ if self.on_demand_feature_views: @@ -272,6 +278,7 @@ def to_s3(self) -> str: ) return self._s3_path + @log_exceptions_and_usage def to_redshift(self, table_name: str) -> None: """ Save dataset as a new Redshift table """ if self.on_demand_feature_views: diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py index 37011c7d51..e9e5973edd 100644 --- a/sdk/python/feast/infra/online_stores/datastore.py +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -26,6 +26,7 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.usage import log_exceptions_and_usage, tracing_span try: from google.auth.exceptions import DefaultCredentialsError @@ -69,6 +70,7 @@ class DatastoreOnlineStore(OnlineStore): _client: Optional[datastore.Client] = None + @log_exceptions_and_usage(online_store="datastore") def update( self, config: RepoConfig, @@ -140,6 +142,7 @@ def _get_client(self, online_config: DatastoreOnlineStoreConfig): ) return self._client + @log_exceptions_and_usage(online_store="datastore") def online_write_batch( self, config: RepoConfig, @@ -220,6 +223,7 @@ def _write_minibatch( if progress: progress(len(entities)) + @log_exceptions_and_usage(online_store="datastore") def online_read( self, config: RepoConfig, @@ -245,7 +249,8 @@ def online_read( # NOTE: get_multi doesn't return values in the same order as the keys in the request. # Also, len(values) can be less than len(keys) in the case of missing values. - values = client.get_multi(keys) + with tracing_span(name="remote_call"): + values = client.get_multi(keys) values_dict = {v.key: v for v in values} if values is not None else {} for key in keys: if key in values_dict: diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 722a081f2e..7ff6702dea 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -23,6 +23,7 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.usage import log_exceptions_and_usage, tracing_span try: import boto3 @@ -48,6 +49,7 @@ class DynamoDBOnlineStore(OnlineStore): Online feature store for AWS DynamoDB. """ + @log_exceptions_and_usage(online_store="dynamodb") def update( self, config: RepoConfig, @@ -97,6 +99,7 @@ def teardown( self._delete_tables_idempotent(dynamodb_resource, config, tables) + @log_exceptions_and_usage(online_store="dynamodb") def online_write_batch( self, config: RepoConfig, @@ -127,6 +130,7 @@ def online_write_batch( if progress: progress(1) + @log_exceptions_and_usage(online_store="dynamodb") def online_read( self, config: RepoConfig, @@ -142,7 +146,8 @@ def online_read( for entity_key in entity_keys: table_instance = dynamodb_resource.Table(f"{config.project}.{table.name}") entity_id = compute_entity_id(entity_key) - response = table_instance.get_item(Key={"entity_id": entity_id}) + with tracing_span(name="remote_call"): + response = table_instance.get_item(Key={"entity_id": entity_id}) value = response.get("Item") if value is not None: diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index b1362ea534..c6df0c69fa 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -37,6 +37,7 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel +from feast.usage import log_exceptions_and_usage, tracing_span try: from redis import Redis @@ -88,6 +89,7 @@ def delete_table_values( logger.debug(f"Deleted {deleted_count} keys for {table.name}") + @log_exceptions_and_usage(online_store="redis") def update( self, config: RepoConfig, @@ -160,6 +162,7 @@ def _get_client(self, online_store_config: RedisOnlineStoreConfig): self._client = Redis(**kwargs) return self._client + @log_exceptions_and_usage(online_store="redis") def online_write_batch( self, config: RepoConfig, @@ -223,6 +226,7 @@ def online_write_batch( if progress: progress(len(results)) + @log_exceptions_and_usage(online_store="redis") def online_read( self, config: RepoConfig, @@ -255,7 +259,8 @@ def online_read( with client.pipeline() as pipe: for redis_key_bin in keys: pipe.hmget(redis_key_bin, hset_keys) - redis_values = pipe.execute() + with tracing_span(name="remote_call"): + redis_values = pipe.execute() for values in redis_values: features = self._get_features_for_entity( values, feature_view, requested_features diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 4be90257dd..62642153f9 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import itertools import os import sqlite3 from datetime import datetime @@ -29,6 +29,7 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.usage import log_exceptions_and_usage, tracing_span class SqliteOnlineStoreConfig(FeastConfigBaseModel): @@ -73,6 +74,7 @@ def _get_conn(self, config: RepoConfig): ) return self._conn + @log_exceptions_and_usage(online_store="sqlite") def online_write_batch( self, config: RepoConfig, @@ -127,6 +129,7 @@ def online_write_batch( if progress: progress(1) + @log_exceptions_and_usage(online_store="sqlite") def online_read( self, config: RepoConfig, @@ -134,24 +137,30 @@ def online_read( entity_keys: List[EntityKeyProto], requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: - pass conn = self._get_conn(config) cur = conn.cursor() result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] - project = config.project - for entity_key in entity_keys: - entity_key_bin = serialize_entity_key(entity_key) - + with tracing_span(name="remote_call"): + # Fetch all entities in one go cur.execute( - f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = ?", - (entity_key_bin,), + f"SELECT entity_key, feature_name, value, event_ts " + f"FROM {_table_id(config.project, table)} " + f"WHERE entity_key IN ({','.join('?' * len(entity_keys))}) " + f"ORDER BY entity_key", + [serialize_entity_key(entity_key) for entity_key in entity_keys], ) + rows = cur.fetchall() + rows = { + k: list(group) for k, group in itertools.groupby(rows, key=lambda r: r[0]) + } + for entity_key in entity_keys: + entity_key_bin = serialize_entity_key(entity_key) res = {} res_ts = None - for feature_name, val_bin, ts in cur.fetchall(): + for _, feature_name, val_bin, ts in rows.get(entity_key_bin, []): val = ValueProto() val.ParseFromString(val_bin) res[feature_name] = val @@ -163,6 +172,7 @@ def online_read( result.append((res_ts, res)) return result + @log_exceptions_and_usage(online_store="sqlite") def update( self, config: RepoConfig, diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 45745cfed0..06a9d3a8b3 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -21,6 +21,7 @@ from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import Registry from feast.repo_config import RepoConfig +from feast.usage import RatioSampler, log_exceptions_and_usage, set_usage_attribute class PassthroughProvider(Provider): @@ -44,6 +45,7 @@ def update_infra( entities_to_keep: Sequence[Entity], partial: bool, ): + set_usage_attribute("provider", self.__class__.__name__) self.online_store.update( config=self.repo_config, tables_to_delete=tables_to_delete, @@ -59,6 +61,7 @@ def teardown_infra( tables: Sequence[Union[FeatureTable, FeatureView]], entities: Sequence[Entity], ) -> None: + set_usage_attribute("provider", self.__class__.__name__) self.online_store.teardown(self.repo_config, tables, entities) def online_write_batch( @@ -70,8 +73,10 @@ def online_write_batch( ], progress: Optional[Callable[[int], Any]], ) -> None: + set_usage_attribute("provider", self.__class__.__name__) self.online_store.online_write_batch(config, table, data, progress) + @log_exceptions_and_usage(sampler=RatioSampler(ratio=0.001)) def online_read( self, config: RepoConfig, @@ -79,6 +84,7 @@ def online_read( entity_keys: List[EntityKeyProto], requested_features: List[str] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + set_usage_attribute("provider", self.__class__.__name__) result = self.online_store.online_read(config, table, entity_keys) return result @@ -86,6 +92,7 @@ def online_read( def ingest_df( self, feature_view: FeatureView, entities: List[Entity], df: pandas.DataFrame, ): + set_usage_attribute("provider", self.__class__.__name__) table = pa.Table.from_pandas(df) if feature_view.batch_source.field_mapping is not None: @@ -108,6 +115,8 @@ def materialize_single_feature_view( project: str, tqdm_builder: Callable[[int], tqdm], ) -> None: + set_usage_attribute("provider", self.__class__.__name__) + entities = [] for entity_name in feature_view.entities: entities.append(registry.get_entity(entity_name, project)) @@ -153,6 +162,8 @@ def get_historical_features( project: str, full_feature_names: bool, ) -> RetrievalJob: + set_usage_attribute("provider", self.__class__.__name__) + job = self.offline_store.get_historical_features( config=config, feature_views=feature_views, diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index da3fb2c628..5546984f71 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -20,6 +20,12 @@ from feast.repo_config import RepoConfig from feast.type_map import python_value_to_proto_value +PROVIDERS_CLASS_FOR_TYPE = { + "gcp": "feast.infra.gcp.GcpProvider", + "aws": "feast.infra.aws.AwsProvider", + "local": "feast.infra.local.LocalProvider", +} + class Provider(abc.ABC): @abc.abstractmethod @@ -158,25 +164,20 @@ def get_feature_server_endpoint(self) -> Optional[str]: def get_provider(config: RepoConfig, repo_path: Path) -> Provider: if "." not in config.provider: - if config.provider in {"gcp", "aws", "local"}: - if config.provider == "aws": - from feast.infra.aws import AwsProvider - - return AwsProvider(config) - - from feast.infra.passthrough_provider import PassthroughProvider - - return PassthroughProvider(config) - else: + if config.provider not in PROVIDERS_CLASS_FOR_TYPE: raise errors.FeastProviderNotImplementedError(config.provider) + + provider = PROVIDERS_CLASS_FOR_TYPE[config.provider] else: - # Split provider into module and class names by finding the right-most dot. - # For example, provider 'foo.bar.MyProvider' will be parsed into 'foo.bar' and 'MyProvider' - module_name, class_name = config.provider.rsplit(".", 1) + provider = config.provider + + # Split provider into module and class names by finding the right-most dot. + # For example, provider 'foo.bar.MyProvider' will be parsed into 'foo.bar' and 'MyProvider' + module_name, class_name = provider.rsplit(".", 1) - cls = importer.get_class_from_type(module_name, class_name, "Provider") + cls = importer.get_class_from_type(module_name, class_name, "Provider") - return cls(config) + return cls(config) def _get_requested_feature_views_to_features_dict( diff --git a/sdk/python/feast/usage.py b/sdk/python/feast/usage.py index 1a0a89471d..1a2bf2e290 100644 --- a/sdk/python/feast/usage.py +++ b/sdk/python/feast/usage.py @@ -12,18 +12,20 @@ # See the License for the specific language governing permissions and # limitations under the License. import concurrent.futures +import contextlib import contextvars -import enum +import dataclasses +import hashlib import logging import os +import platform import sys +import typing import uuid -from collections import defaultdict from datetime import datetime from functools import wraps from os.path import expanduser, join from pathlib import Path -from typing import List, Optional, Tuple, Union import requests @@ -31,217 +33,346 @@ from feast.version import get_version USAGE_ENDPOINT = "https://usage.feast.dev" + _logger = logging.getLogger(__name__) +_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) -executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) -call_stack: contextvars.ContextVar = contextvars.ContextVar("call_stack", default=[]) +_is_enabled = os.getenv(FEAST_USAGE, default="True") == "True" +_constant_attributes = { + "session_id": str(uuid.uuid4()), + "installation_id": None, + "version": get_version(), + "python_version": platform.python_version(), + "platform": platform.platform(), + "env_signature": hashlib.md5( + ",".join( + sorted([k for k in os.environ.keys() if not k.startswith("FEAST")]) + ).encode() + ).hexdigest(), +} -@enum.unique -class UsageEvent(enum.Enum): - """ - An event meant to be logged - """ - UNKNOWN = 0 - APPLY_WITH_ODFV = 1 - GET_HISTORICAL_FEATURES_WITH_ODFV = 2 - GET_ONLINE_FEATURES_WITH_ODFV = 3 - GET_HISTORICAL_FEATURES_WITH_REQUEST_FV = 4 - GET_ONLINE_FEATURES_WITH_REQUEST_FV = 5 +@dataclasses.dataclass +class FnCall: + fn_name: str + id: str - def __str__(self): - return self.name.lower() + start: datetime + end: typing.Optional[datetime] = None + parent_id: typing.Optional[str] = None -class Usage: - def __init__(self): - self._usage_enabled: bool = False - self._is_test = os.getenv("FEAST_IS_USAGE_TEST", "False") == "True" - self._usage_counter = defaultdict(lambda: 0) - self.check_env_and_configure() - - def check_env_and_configure(self): - usage_enabled = ( - os.getenv(FEAST_USAGE, default="True") == "True" - ) # written this way to turn the env var string into a boolean - - # Check if it changed - if usage_enabled != self._usage_enabled: - self._usage_enabled = usage_enabled - - if self._usage_enabled: - try: - feast_home_dir = join(expanduser("~"), ".feast") - Path(feast_home_dir).mkdir(exist_ok=True) - usage_filepath = join(feast_home_dir, "usage") - - if os.path.exists(usage_filepath): - with open(usage_filepath, "r") as f: - self._usage_id = f.read() - else: - self._usage_id = str(uuid.uuid4()) - - with open(usage_filepath, "w") as f: - f.write(self._usage_id) - print( - "Feast is an open source project that collects anonymized error reporting and usage statistics. To opt out or learn" - " more see https://docs.feast.dev/reference/usage" - ) - except Exception as e: - _logger.debug(f"Unable to configure usage {e}") + +class Sampler: + def should_record(self, event) -> bool: + raise NotImplementedError @property - def usage_id(self) -> Optional[str]: - if os.getenv("FEAST_FORCE_USAGE_UUID"): - return os.getenv("FEAST_FORCE_USAGE_UUID") - return self._usage_id - - def _send_usage_request(self, json): - try: - future = executor.submit(requests.post, USAGE_ENDPOINT, json=json) - if self._is_test: - concurrent.futures.wait([future]) - except Exception as e: - if self._is_test: - raise e - else: - pass - - def log_function(self, function_name: str): - self.check_env_and_configure() - if self._usage_enabled and self.usage_id: - if "get_online_features" in call_stack.get() and not self.should_log_for_get_online_features_event( - "get_online_features" - ): - return - json = { - "function_name": function_name, - "usage_id": self.usage_id, - "timestamp": datetime.utcnow().isoformat(), - "version": get_version(), - "os": sys.platform, - "is_test": self._is_test, - } - self._send_usage_request(json) - - def increment_event_count(self, event_name: Union[UsageEvent, str]): - self._usage_counter[event_name] += 1 - - def should_log_for_get_online_features_event(self, event_name: str): - if self._usage_counter[event_name] % 10000 != 2: - return False - self._usage_counter[event_name] = 2 # avoid overflow + def priority(self): + return 0 + + +class AlwaysSampler(Sampler): + def should_record(self, event) -> bool: return True - def log_event(self, event: UsageEvent): - self.check_env_and_configure() - if self._usage_enabled and self.usage_id: - event_name = str(event) - if ( - event == UsageEvent.GET_ONLINE_FEATURES_WITH_ODFV - and not self.should_log_for_get_online_features_event(event_name) - ): - return - json = { - "event_name": event_name, - "usage_id": self.usage_id, - "timestamp": datetime.utcnow().isoformat(), - "version": get_version(), - "os": sys.platform, - "is_test": self._is_test, - } - self._send_usage_request(json) - - def log_exception(self, error_type: str, traceback: List[Tuple[str, int, str]]): - self.check_env_and_configure() - if self._usage_enabled and self.usage_id: - json = { - "error_type": error_type, - "traceback": traceback, - "usage_id": self.usage_id, - "version": get_version(), - "os": sys.platform, - "is_test": self._is_test, - } - try: - requests.post(USAGE_ENDPOINT, json=json) - except Exception as e: - if self._is_test: - raise e - else: - pass - return - - -def log_exceptions(func): - @wraps(func) - def exception_logging_wrapper(*args, **kwargs): - try: - call_stack.set(call_stack.get() + [func.__name__]) - result = func(*args, **kwargs) - except Exception as e: - error_type = type(e).__name__ - trace_to_log = [] - tb = e.__traceback__ - while tb is not None: - trace_to_log.append( - ( - _trim_filename(tb.tb_frame.f_code.co_filename), - tb.tb_lineno, - tb.tb_frame.f_code.co_name, - ) + +class RatioSampler(Sampler): + MAX_COUNTER = (1 << 32) - 1 + + def __init__(self, ratio): + assert 0 < ratio <= 1, "Ratio must be within (0, 1]" + self.ratio = ratio + self.total_counter = 0 + self.sampled_counter = 0 + + def should_record(self, event) -> bool: + self.total_counter += 1 + if self.total_counter == self.MAX_COUNTER: + self.total_counter = 1 + self.sampled_counter = 1 + + decision = self.sampled_counter < self.ratio * self.total_counter + self.sampled_counter += int(decision) + return decision + + @property + def priority(self): + return int(1 / self.ratio) + + +class UsageContext: + attributes: typing.Dict[str, typing.Any] + + call_stack: typing.List[FnCall] + completed_calls: typing.List[FnCall] + + exception: typing.Optional[Exception] = None + traceback: typing.Optional[typing.Tuple[str, int, str]] = None + + sampler: Sampler = AlwaysSampler() + + def __init__(self): + self.attributes = {} + self.call_stack = [] + self.completed_calls = [] + + +_context = contextvars.ContextVar("usage_context", default=UsageContext()) + + +def _set_installation_id(): + if os.getenv("FEAST_FORCE_USAGE_UUID"): + _constant_attributes["installation_id"] = os.getenv("FEAST_FORCE_USAGE_UUID") + _constant_attributes["installation_ts"] = datetime.utcnow().isoformat() + return + + feast_home_dir = join(expanduser("~"), ".feast") + installation_timestamp = datetime.utcnow() + + try: + Path(feast_home_dir).mkdir(exist_ok=True) + usage_filepath = join(feast_home_dir, "usage") + + if os.path.exists(usage_filepath): + installation_timestamp = datetime.utcfromtimestamp( + os.path.getmtime(usage_filepath) + ) + with open(usage_filepath, "r") as f: + installation_id = f.read() + else: + installation_id = str(uuid.uuid4()) + + with open(usage_filepath, "w") as f: + f.write(installation_id) + print( + "Feast is an open source project that collects " + "anonymized error reporting and usage statistics. To opt out or learn" + " more see https://docs.feast.dev/reference/usage" + ) + except OSError as e: + _logger.debug(f"Unable to configure usage {e}") + installation_id = "undefined" + + _constant_attributes["installation_id"] = installation_id + _constant_attributes["installation_ts"] = installation_timestamp.isoformat() + + +_set_installation_id() + + +def _export(event: typing.Dict[str, typing.Any]): + _executor.submit(requests.post, USAGE_ENDPOINT, json=event, timeout=30) + + +def _produce_event(ctx: UsageContext): + is_test = bool({"pytest", "unittest"} & sys.modules.keys()) + event = { + "timestamp": datetime.utcnow().isoformat(), + "is_test": is_test, + "is_webserver": ( + not is_test and bool({"uwsgi", "gunicorn", "fastapi"} & sys.modules.keys()) + ), + "calls": [ + dict( + fn_name=c.fn_name, + id=c.id, + parent_id=c.parent_id, + start=c.start and c.start.isoformat(), + end=c.end and c.end.isoformat(), + ) + for c in reversed(ctx.completed_calls) + ], + "entrypoint": ctx.completed_calls[-1].fn_name, + "exception": repr(ctx.exception) if ctx.exception else None, + "traceback": ctx.traceback if ctx.exception else None, + **_constant_attributes, + } + event.update(ctx.attributes) + + if ctx.sampler and not ctx.sampler.should_record(event): + return + + _export(event) + + +@contextlib.contextmanager +def tracing_span(name): + """ + Context manager for wrapping heavy parts of code in tracing span + """ + if _is_enabled: + ctx = _context.get() + if not ctx.call_stack: + raise RuntimeError("tracing_span must be called in usage context") + + last_call = ctx.call_stack[-1] + fn_call = FnCall( + id=uuid.uuid4().hex, + parent_id=last_call.id, + fn_name=f"{last_call.fn_name}.{name}", + start=datetime.utcnow(), + ) + try: + yield + finally: + if _is_enabled: + fn_call.end = datetime.utcnow() + ctx.completed_calls.append(fn_call) + + +def log_exceptions_and_usage(*args, **attrs): + """ + This function decorator enables three components: + 1. Error tracking + 2. Usage statistic collection + 3. Time profiling + + This data is being collected, anonymized and sent to Feast Developers. + All events from nested decorated functions are being grouped into single event + to build comprehensive context useful for profiling and error tracking. + + Usage example (will result in one output event): + @log_exceptions_and_usage + def fn(...): + nested() + + @log_exceptions_and_usage(attr='value') + def nested(...): + deeply_nested() + + @log_exceptions_and_usage(attr2='value2', sample=RateSampler(rate=0.1)) + def deeply_nested(...): + ... + """ + sampler = attrs.pop("sampler", AlwaysSampler()) + + def decorator(func): + if not _is_enabled: + return func + + @wraps(func) + def wrapper(*args, **kwargs): + ctx = _context.get() + ctx.call_stack.append( + FnCall( + id=uuid.uuid4().hex, + parent_id=ctx.call_stack[-1].id if ctx.call_stack else None, + fn_name=_fn_fullname(func), + start=datetime.utcnow(), ) - tb = tb.tb_next - usage.log_exception(error_type, trace_to_log) - raise - finally: - if len(call_stack.get()) > 0: - call_stack.set(call_stack.get()[:-1]) - return result - - return exception_logging_wrapper - - -def log_exceptions_and_usage(func): - @wraps(func) - def exception_logging_wrapper(*args, **kwargs): - try: - call_stack.set(call_stack.get() + [func.__name__]) - usage.increment_event_count(func.__name__) - result = func(*args, **kwargs) - usage.log_function(func.__name__) - except Exception as e: - error_type = type(e).__name__ - trace_to_log = [] - tb = e.__traceback__ - while tb is not None: - trace_to_log.append( - ( - _trim_filename(tb.tb_frame.f_code.co_filename), - tb.tb_lineno, - tb.tb_frame.f_code.co_name, - ) + ) + ctx.attributes.update(attrs) + + try: + return func(*args, **kwargs) + except Exception: + if ctx.exception: + # exception was already recorded + raise + + _, exc, traceback = sys.exc_info() + ctx.exception = exc + ctx.traceback = _trace_to_log(traceback) + + if traceback: + raise exc.with_traceback(traceback) + + raise exc + finally: + last_call = ctx.call_stack.pop(-1) + last_call.end = datetime.utcnow() + ctx.completed_calls.append(last_call) + ctx.sampler = ( + sampler if sampler.priority > ctx.sampler.priority else ctx.sampler ) - tb = tb.tb_next - usage.log_exception(error_type, trace_to_log) - raise - finally: - if len(call_stack.get()) > 0: - call_stack.set(call_stack.get()[:-1]) - return result - return exception_logging_wrapper + if not ctx.call_stack: + # we reached the root of the stack + _context.set(UsageContext()) # reset context to default values + _produce_event(ctx) + + return wrapper + + if args: + return decorator(args[0]) + + return decorator + + +def log_exceptions(*args, **attrs): + """ + Function decorator that track errors and send them to Feast Developers + """ + + def decorator(func): + if not _is_enabled: + return func + + @wraps(func) + def wrapper(*args, **kwargs): + if _context.get().call_stack: + # we're already inside usage context + # let it handle exception + return func(*args, **kwargs) + + fn_call = FnCall( + id=uuid.uuid4().hex, fn_name=_fn_fullname(func), start=datetime.utcnow() + ) + try: + return func(*args, **kwargs) + except Exception: + _, exc, traceback = sys.exc_info() + fn_call.end = datetime.utcnow() -def log_event(event: UsageEvent): - usage.increment_event_count(event) - usage.log_event(event) + ctx = UsageContext() + ctx.exception = exc + ctx.traceback = _trace_to_log(traceback) + ctx.attributes = attrs + ctx.completed_calls.append(fn_call) + _produce_event(ctx) + + if traceback: + raise exc.with_traceback(traceback) + + raise exc + + return wrapper + + if args: + return decorator(args[0]) + + return decorator + + +def set_usage_attribute(name, value): + """ + Extend current context with custom attribute + """ + ctx = _context.get() + ctx.attributes[name] = value def _trim_filename(filename: str) -> str: return filename.split("/")[-1] -# Single global usage object -usage = Usage() +def _fn_fullname(fn: typing.Callable): + return fn.__module__ + "." + fn.__qualname__ + + +def _trace_to_log(traceback): + log = [] + while traceback is not None: + log.append( + ( + _trim_filename(traceback.tb_frame.f_code.co_filename), + traceback.tb_lineno, + traceback.tb_frame.f_code.co_name, + ) + ) + traceback = traceback.tb_next + + return log diff --git a/sdk/python/tests/data/data_creator.py b/sdk/python/tests/data/data_creator.py index 0b6b638711..e5355b40bb 100644 --- a/sdk/python/tests/data/data_creator.py +++ b/sdk/python/tests/data/data_creator.py @@ -13,7 +13,7 @@ def create_dataset( feature_is_list: bool = False, list_has_empty_list: bool = False, ) -> pd.DataFrame: - now = datetime.now().replace(microsecond=0, second=0, minute=0) + now = datetime.utcnow().replace(microsecond=0, second=0, minute=0) ts = pd.Timestamp(now).round("ms") data = { "driver_id": get_entities_for_value_type(entity_type), diff --git a/sdk/python/tests/integration/e2e/test_universal_e2e.py b/sdk/python/tests/integration/e2e/test_universal_e2e.py index a36aed7947..9718c40d38 100644 --- a/sdk/python/tests/integration/e2e/test_universal_e2e.py +++ b/sdk/python/tests/integration/e2e/test_universal_e2e.py @@ -78,7 +78,7 @@ def check_offline_and_online_features( def run_offline_online_store_consistency_test( fs: FeatureStore, fv: FeatureView ) -> None: - now = datetime.now() + now = datetime.utcnow() full_feature_names = True check_offline_store: bool = True diff --git a/sdk/python/tests/integration/e2e/test_usage_e2e.py b/sdk/python/tests/integration/e2e/test_usage_e2e.py new file mode 100644 index 0000000000..c05ce8daf4 --- /dev/null +++ b/sdk/python/tests/integration/e2e/test_usage_e2e.py @@ -0,0 +1,135 @@ +# Copyright 2020 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import sys +import tempfile +from importlib import reload +from unittest.mock import patch + +import pytest + +from feast import Entity, RepoConfig, ValueType, usage +from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig + + +@pytest.fixture(scope="function") +def dummy_exporter(): + event_log = [] + + with patch("feast.usage._export", new=event_log.append): + yield event_log + + +@pytest.mark.integration +def test_usage_on(dummy_exporter): + usage._is_enabled = True + + _reload_feast() + from feast.feature_store import FeatureStore + + with tempfile.TemporaryDirectory() as temp_dir: + test_feature_store = FeatureStore( + config=RepoConfig( + registry=os.path.join(temp_dir, "registry.db"), + project="fake_project", + provider="local", + online_store=SqliteOnlineStoreConfig( + path=os.path.join(temp_dir, "online.db") + ), + ) + ) + entity = Entity( + name="driver_car_id", + description="Car driver id", + value_type=ValueType.STRING, + labels={"team": "matchmaking"}, + ) + + test_feature_store.apply([entity]) + + assert len(dummy_exporter) == 1 + assert { + "entrypoint": "feast.feature_store.FeatureStore.apply" + }.items() <= dummy_exporter[0].items() + + +@pytest.mark.integration +def test_usage_off(dummy_exporter): + usage._is_enabled = False + + _reload_feast() + from feast.feature_store import FeatureStore + + with tempfile.TemporaryDirectory() as temp_dir: + test_feature_store = FeatureStore( + config=RepoConfig( + registry=os.path.join(temp_dir, "registry.db"), + project="fake_project", + provider="local", + online_store=SqliteOnlineStoreConfig( + path=os.path.join(temp_dir, "online.db") + ), + ) + ) + entity = Entity( + name="driver_car_id", + description="Car driver id", + value_type=ValueType.STRING, + labels={"team": "matchmaking"}, + ) + test_feature_store.apply([entity]) + + assert not dummy_exporter + + +@pytest.mark.integration +def test_exception_usage_on(dummy_exporter): + usage._is_enabled = True + + _reload_feast() + from feast.feature_store import FeatureStore + + with pytest.raises(OSError): + FeatureStore("/tmp/non_existent_directory") + + assert len(dummy_exporter) == 1 + assert { + "entrypoint": "feast.feature_store.FeatureStore.__init__", + "exception": repr(FileNotFoundError(2, "No such file or directory")), + }.items() <= dummy_exporter[0].items() + + +@pytest.mark.integration +def test_exception_usage_off(dummy_exporter): + usage._is_enabled = False + + _reload_feast() + from feast.feature_store import FeatureStore + + with pytest.raises(OSError): + FeatureStore("/tmp/non_existent_directory") + + assert not dummy_exporter + + +def _reload_feast(): + """ After changing environment need to reload modules and rerun usage decorators """ + modules = ( + "feast.infra.local", + "feast.infra.online_stores.sqlite", + "feast.feature_store", + ) + for mod in modules: + if mod in sys.modules: + reload(sys.modules[mod]) diff --git a/sdk/python/tests/unit/test_usage.py b/sdk/python/tests/unit/test_usage.py new file mode 100644 index 0000000000..13988d3264 --- /dev/null +++ b/sdk/python/tests/unit/test_usage.py @@ -0,0 +1,237 @@ +import datetime +import json +import time +from unittest.mock import patch + +import pytest + +from feast.usage import ( + RatioSampler, + log_exceptions, + log_exceptions_and_usage, + set_usage_attribute, + tracing_span, +) + + +@pytest.fixture(scope="function") +def dummy_exporter(): + event_log = [] + + with patch( + "feast.usage._export", + new=lambda e: event_log.append(json.loads(json.dumps(e))), + ): + yield event_log + + +@pytest.fixture(scope="function", autouse=True) +def enabling_patch(): + with patch("feast.usage._is_enabled") as p: + p.__bool__.return_value = True + yield p + + +def test_logging_disabled(dummy_exporter, enabling_patch): + enabling_patch.__bool__.return_value = False + + @log_exceptions_and_usage(event="test-event") + def entrypoint(): + pass + + @log_exceptions(event="test-event") + def entrypoint2(): + raise ValueError(1) + + entrypoint() + with pytest.raises(ValueError): + entrypoint2() + + assert not dummy_exporter + + +def test_global_context_building(dummy_exporter): + @log_exceptions_and_usage(event="test-event") + def entrypoint(provider): + if provider == "one": + provider_one() + if provider == "two": + provider_two() + + @log_exceptions_and_usage(provider="provider-one") + def provider_one(): + dummy_layer() + + @log_exceptions_and_usage(provider="provider-two") + def provider_two(): + set_usage_attribute("new-attr", "new-val") + + @log_exceptions_and_usage + def dummy_layer(): + redis_store() + + @log_exceptions_and_usage(store="redis") + def redis_store(): + set_usage_attribute("attr", "val") + + entrypoint(provider="one") + entrypoint(provider="two") + + scope_name = "test_usage.test_global_context_building." + + assert dummy_exporter + assert { + "event": "test-event", + "provider": "provider-one", + "store": "redis", + "attr": "val", + "entrypoint": f"{scope_name}.entrypoint", + }.items() <= dummy_exporter[0].items() + assert dummy_exporter[0]["calls"][0]["fn_name"] == f"{scope_name}.entrypoint" + assert dummy_exporter[0]["calls"][1]["fn_name"] == f"{scope_name}.provider_one" + assert dummy_exporter[0]["calls"][2]["fn_name"] == f"{scope_name}.dummy_layer" + assert dummy_exporter[0]["calls"][3]["fn_name"] == f"{scope_name}.redis_store" + + assert ( + not {"store", "attr"} & dummy_exporter[1].keys() + ) # check that context was reset + assert { + "event": "test-event", + "provider": "provider-two", + "new-attr": "new-val", + }.items() <= dummy_exporter[1].items() + + +def test_exception_recording(dummy_exporter): + @log_exceptions_and_usage(event="test-event") + def entrypoint(): + provider() + + @log_exceptions_and_usage(provider="provider-one") + def provider(): + raise ValueError(1) + + with pytest.raises(ValueError): + entrypoint() + + assert dummy_exporter + assert { + "event": "test-event", + "provider": "provider-one", + "exception": repr(ValueError(1)), + "entrypoint": "test_usage.test_exception_recording..entrypoint", + }.items() <= dummy_exporter[0].items() + + +def test_only_exception_logging(dummy_exporter): + @log_exceptions(scope="exception-only") + def failing_fn(): + raise ValueError(1) + + @log_exceptions_and_usage(scope="usage-and-exception") + def entrypoint(): + failing_fn() + + with pytest.raises(ValueError): + failing_fn() + + assert { + "exception": repr(ValueError(1)), + "scope": "exception-only", + "entrypoint": "test_usage.test_only_exception_logging..failing_fn", + }.items() <= dummy_exporter[0].items() + + with pytest.raises(ValueError): + entrypoint() + + assert { + "exception": repr(ValueError(1)), + "scope": "usage-and-exception", + "entrypoint": "test_usage.test_only_exception_logging..entrypoint", + }.items() <= dummy_exporter[1].items() + + +def test_ratio_based_sampling(dummy_exporter): + @log_exceptions_and_usage() + def entrypoint(): + expensive_fn() + + @log_exceptions_and_usage(sampler=RatioSampler(ratio=0.1)) + def expensive_fn(): + pass + + for _ in range(100): + entrypoint() + + assert len(dummy_exporter) == 10 + + +def test_sampling_priority(dummy_exporter): + @log_exceptions_and_usage(sampler=RatioSampler(ratio=0.3)) + def entrypoint(): + expensive_fn() + + @log_exceptions_and_usage(sampler=RatioSampler(ratio=0.01)) + def expensive_fn(): + other_fn() + + @log_exceptions_and_usage(sampler=RatioSampler(ratio=0.1)) + def other_fn(): + pass + + for _ in range(300): + entrypoint() + + assert len(dummy_exporter) == 3 + + +def test_time_recording(dummy_exporter): + @log_exceptions_and_usage() + def entrypoint(): + time.sleep(0.1) + expensive_fn() + + @log_exceptions_and_usage() + def expensive_fn(): + time.sleep(0.5) + other_fn() + + @log_exceptions_and_usage() + def other_fn(): + time.sleep(0.2) + + entrypoint() + + assert dummy_exporter + calls = dummy_exporter[0]["calls"] + assert call_length_ms(calls[0]) >= 800 + assert call_length_ms(calls[0]) > call_length_ms(calls[1]) >= 700 + assert call_length_ms(calls[1]) > call_length_ms(calls[2]) >= 200 + + +def test_profiling_decorator(dummy_exporter): + @log_exceptions_and_usage() + def entrypoint(): + with tracing_span("custom_span"): + time.sleep(0.1) + + entrypoint() + + assert dummy_exporter + + calls = dummy_exporter[0]["calls"] + assert len(calls) + assert call_length_ms(calls[0]) >= 100 + assert call_length_ms(calls[1]) >= 100 + + assert ( + calls[1]["fn_name"] + == "test_usage.test_profiling_decorator..entrypoint.custom_span" + ) + + +def call_length_ms(call): + return ( + datetime.datetime.fromisoformat(call["end"]) + - datetime.datetime.fromisoformat(call["start"]) + ).total_seconds() * 10 ** 3 diff --git a/sdk/python/usage_tests/test_usage.py b/sdk/python/usage_tests/test_usage.py deleted file mode 100644 index e72e11b008..0000000000 --- a/sdk/python/usage_tests/test_usage.py +++ /dev/null @@ -1,157 +0,0 @@ -# Copyright 2020 The Feast Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import tempfile -import uuid -from datetime import datetime - -from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig -from tenacity import retry, wait_exponential, stop_after_attempt - -from google.cloud import bigquery -import os -from time import sleep - -from feast import Entity, ValueType, FeatureStore, RepoConfig - - -USAGE_BIGQUERY_TABLE = ( - "kf-feast.feast_telemetry.cloudfunctions_googleapis_com_cloud_functions" -) - - -def test_usage_on(): - old_environ = dict(os.environ) - test_usage_id = str(uuid.uuid4()) - os.environ["FEAST_FORCE_USAGE_UUID"] = test_usage_id - os.environ["FEAST_IS_USAGE_TEST"] = "True" - os.environ["FEAST_USAGE"] = "True" - - with tempfile.TemporaryDirectory() as temp_dir: - test_feature_store = FeatureStore( - config=RepoConfig( - registry=os.path.join(temp_dir, "registry.db"), - project="fake_project", - provider="local", - online_store=SqliteOnlineStoreConfig( - path=os.path.join(temp_dir, "online.db") - ), - ) - ) - entity = Entity( - name="driver_car_id", - description="Car driver id", - value_type=ValueType.STRING, - labels={"team": "matchmaking"}, - ) - - test_feature_store.apply([entity]) - - os.environ.clear() - os.environ.update(old_environ) - ensure_bigquery_usage_id_with_retry(test_usage_id) - - -def test_usage_off(): - old_environ = dict(os.environ) - test_usage_id = str(uuid.uuid4()) - os.environ["FEAST_IS_USAGE_TEST"] = "True" - os.environ["FEAST_USAGE"] = "False" - os.environ["FEAST_FORCE_USAGE_UUID"] = test_usage_id - - with tempfile.TemporaryDirectory() as temp_dir: - test_feature_store = FeatureStore( - config=RepoConfig( - registry=os.path.join(temp_dir, "registry.db"), - project="fake_project", - provider="local", - online_store=SqliteOnlineStoreConfig( - path=os.path.join(temp_dir, "online.db") - ), - ) - ) - entity = Entity( - name="driver_car_id", - description="Car driver id", - value_type=ValueType.STRING, - labels={"team": "matchmaking"}, - ) - test_feature_store.apply([entity]) - - os.environ.clear() - os.environ.update(old_environ) - sleep(30) - rows = read_bigquery_usage_id(test_usage_id) - assert rows.total_rows == 0 - - -def test_exception_usage_on(): - old_environ = dict(os.environ) - test_usage_id = str(uuid.uuid4()) - os.environ["FEAST_FORCE_USAGE_UUID"] = test_usage_id - os.environ["FEAST_IS_USAGE_TEST"] = "True" - os.environ["FEAST_USAGE"] = "True" - - try: - test_feature_store = FeatureStore("/tmp/non_existent_directory") - except: - pass - - os.environ.clear() - os.environ.update(old_environ) - ensure_bigquery_usage_id_with_retry(test_usage_id) - - -def test_exception_usage_off(): - old_environ = dict(os.environ) - test_usage_id = str(uuid.uuid4()) - os.environ["FEAST_IS_USAGE_TEST"] = "True" - os.environ["FEAST_USAGE"] = "False" - os.environ["FEAST_FORCE_USAGE_UUID"] = test_usage_id - - try: - test_feature_store = FeatureStore("/tmp/non_existent_directory") - except: - pass - - os.environ.clear() - os.environ.update(old_environ) - sleep(30) - rows = read_bigquery_usage_id(test_usage_id) - assert rows.total_rows == 0 - - -@retry(wait=wait_exponential(multiplier=1, min=1, max=10), stop=stop_after_attempt(7)) -def ensure_bigquery_usage_id_with_retry(usage_id): - rows = read_bigquery_usage_id(usage_id) - if rows.total_rows == 0: - raise Exception(f"Could not find usage id: {usage_id}") - - -def read_bigquery_usage_id(usage_id): - bq_client = bigquery.Client() - query = f""" - SELECT - usage_id - FROM ( - SELECT - JSON_EXTRACT(textPayload, '$.usage_id') AS usage_id - FROM - `{USAGE_BIGQUERY_TABLE}` - WHERE - timestamp >= TIMESTAMP(\"{datetime.utcnow().date().isoformat()}\")) - WHERE - usage_id = '\"{usage_id}\"' - """ - query_job = bq_client.query(query) - return query_job.result()