Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port telemetry to FeatureStore API #1446

Merged
merged 8 commits into from
Apr 10, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 7 additions & 40 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import shutil
import uuid
import warnings
from datetime import datetime
from os.path import expanduser, join
from typing import Any, Dict, List, Optional, Union

Expand Down Expand Up @@ -73,7 +72,7 @@
)
from feast.protos.feast.serving.ServingService_pb2_grpc import ServingServiceStub
from feast.registry import Registry
from feast.telemetry import log_usage
from feast.telemetry import tele

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -364,7 +363,7 @@ def _configure_telemetry(self):
else:
self._telemetry_id = str(uuid.uuid4())
print(
"Feast is an open source project that collects anonymized usage statistics. To opt out or learn more see https://docs.feast.dev/v/master/advanced/telemetry"
"Feast is an open source project that collects anonymized usage statistics. To opt out or learn more see https://docs.feast.dev/v/v0.9-branch/advanced/telemetry"
)
with open(telemetry_filepath, "w") as f:
f.write(self._telemetry_id)
Expand Down Expand Up @@ -492,13 +491,7 @@ def apply(
>>> feast_client.apply(entity)
"""

if self._telemetry_enabled:
log_usage(
"apply",
self._telemetry_id,
datetime.utcnow(),
self.version(sdk_only=True),
)
tele.log("apply")
if project is None:
project = self.project

Expand Down Expand Up @@ -612,13 +605,7 @@ def get_entity(self, name: str, project: str = None) -> Entity:
none is found
"""

if self._telemetry_enabled:
log_usage(
"get_entity",
self._telemetry_id,
datetime.utcnow(),
self.version(sdk_only=True),
)
tele.log("get_entity")

if project is None:
project = self.project
Expand Down Expand Up @@ -743,13 +730,7 @@ def get_feature_table(self, name: str, project: str = None) -> FeatureTable:
none is found
"""

if self._telemetry_enabled:
log_usage(
"get_feature_table",
self._telemetry_id,
datetime.utcnow(),
self.version(sdk_only=True),
)
tele.log("get_feature_table")

if project is None:
project = self.project
Expand Down Expand Up @@ -890,13 +871,7 @@ def ingest(
>>> client.ingest(driver_ft, ft_df)
"""

if self._telemetry_enabled:
log_usage(
"ingest",
self._telemetry_id,
datetime.utcnow(),
self.version(sdk_only=True),
)
tele.log("ingest")
if project is None:
project = self.project
if isinstance(feature_table, str):
Expand Down Expand Up @@ -1021,15 +996,7 @@ def get_online_features(
{'sales:daily_transactions': [1.1,1.2], 'sales:customer_id': [0,1]}
"""

if self._telemetry_enabled:
if self._telemetry_counter["get_online_features"] % 1000 == 0:
log_usage(
"get_online_features",
self._telemetry_id,
datetime.utcnow(),
self.version(sdk_only=True),
)
self._telemetry_counter["get_online_features"] += 1
tele.log("get_online_features")
try:
response = self._serving_service.GetOnlineFeaturesV2(
GetOnlineFeaturesRequestV2(
Expand Down
26 changes: 26 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
RepoConfig,
load_repo_config,
)
from feast.telemetry import tele
from feast.type_map import python_value_to_proto_value
from feast.version import get_version


class FeatureStore:
Expand Down Expand Up @@ -75,6 +77,11 @@ def __init__(
cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds),
)

def version(self) -> str:
"""Returns the version of the current Feast SDK/CLI"""

return get_version()

@property
def project(self) -> str:
return self.config.project
Expand All @@ -96,6 +103,7 @@ def refresh_registry(self):
greater than 0, then once the cache becomes stale (more time than the TTL has passed), a new cache will be
downloaded synchronously, which may increase latencies if the triggering method is get_online_features()
"""
tele.log("refresh_registry")

registry_config = self.config.get_registry_config()
self._registry = Registry(
Expand All @@ -111,6 +119,8 @@ def list_entities(self) -> List[Entity]:
Returns:
List of entities
"""
tele.log("list_entities")

return self._registry.list_entities(self.project)

def list_feature_views(self) -> List[FeatureView]:
Expand All @@ -120,6 +130,8 @@ def list_feature_views(self) -> List[FeatureView]:
Returns:
List of feature views
"""
tele.log("list_feature_views")

return self._registry.list_feature_views(self.project)

def get_entity(self, name: str) -> Entity:
Expand All @@ -133,6 +145,8 @@ def get_entity(self, name: str) -> Entity:
Returns either the specified entity, or raises an exception if
none is found
"""
tele.log("get_entity")

return self._registry.get_entity(name, self.project)

def get_feature_view(self, name: str) -> FeatureView:
Expand All @@ -146,6 +160,8 @@ def get_feature_view(self, name: str) -> FeatureView:
Returns either the specified feature view, or raises an exception if
none is found
"""
tele.log("get_feature_view")

return self._registry.get_feature_view(name, self.project)

def delete_feature_view(self, name: str):
Expand All @@ -155,6 +171,8 @@ def delete_feature_view(self, name: str):
Args:
name: Name of feature view
"""
tele.log("delete_feature_view")

return self._registry.delete_feature_view(name, self.project)

def apply(self, objects: List[Union[FeatureView, Entity]]):
Expand Down Expand Up @@ -186,6 +204,8 @@ def apply(self, objects: List[Union[FeatureView, Entity]]):
>>> fs.apply([customer_entity, customer_feature_view])
"""

tele.log("apply")

# TODO: Add locking
# TODO: Optimize by only making a single call (read/write)

Expand Down Expand Up @@ -246,6 +266,7 @@ def get_historical_features(
>>> feature_data = job.to_df()
>>> model.fit(feature_data) # insert your modeling framework here.
"""
tele.log("get_historical_features")

all_feature_views = self._registry.list_feature_views(
project=self.config.project
Expand Down Expand Up @@ -282,6 +303,8 @@ def materialize_incremental(
>>> fs = FeatureStore(config=RepoConfig(provider="gcp", registry="gs://my-fs/", project="my_fs_proj"))
>>> fs.materialize_incremental(end_date=datetime.utcnow() - timedelta(minutes=5))
"""
tele.log("materialize_incremental")

feature_views_to_materialize = []
if feature_views is None:
feature_views_to_materialize = self._registry.list_feature_views(
Expand Down Expand Up @@ -335,6 +358,8 @@ def materialize(
>>> start_date=datetime.utcnow() - timedelta(hours=3), end_date=datetime.utcnow() - timedelta(minutes=10)
>>> )
"""
tele.log("materialize")

feature_views_to_materialize = []
if feature_views is None:
feature_views_to_materialize = self._registry.list_feature_views(
Expand Down Expand Up @@ -424,6 +449,7 @@ def get_online_features(
>>> print(online_response_dict)
{'sales:daily_transactions': [1.1,1.2], 'sales:customer_id': [0,1]}
"""
tele.log("get_online_features")

response = self._get_online_features(
feature_refs=feature_refs,
Expand Down
79 changes: 59 additions & 20 deletions sdk/python/feast/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,71 @@

import os
import sys
import uuid
from datetime import datetime
from typing import Dict
from os.path import expanduser, join

import pkg_resources
import requests

from feast.version import get_version

TELEMETRY_ENDPOINT = (
"https://us-central1-kf-feast.cloudfunctions.net/bq_telemetry_logger"
)


def log_usage(
function_name: str,
telemetry_id: str,
timestamp: datetime,
version: Dict[str, Dict[str, str]],
):
json = {
"function_name": function_name,
"telemetry_id": telemetry_id,
"timestamp": timestamp.isoformat(),
"version": version,
"os": sys.platform,
"is_test": os.getenv("FEAST_IS_TELEMETRY_TEST", "False"),
}
try:
requests.post(TELEMETRY_ENDPOINT, json=json)
except Exception:
pass
return
try:
sdk_version = pkg_resources.get_distribution("feast").version
except pkg_resources.DistributionNotFound:
sdk_version = "local build"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was accidentally left in fyi



class Telemetry:
def __init__(self):
telemetry_filepath = join(expanduser("~"), ".feast", "telemetry")

self._telemetry_enabled = (
os.getenv("FEAST_TELEMETRY", default="True") == "True"
) # written this way to turn the env var string into a boolean

if self._telemetry_enabled:
self._telemetry_counter = {"get_online_features": 0}
if os.path.exists(telemetry_filepath):
with open(telemetry_filepath, "r") as f:
self._telemetry_id = f.read()
else:
self._telemetry_id = str(uuid.uuid4())
print(
"Feast is an open source project that collects anonymized usage statistics. To opt out or learn more see https://docs.feast.dev/v/master/feast-on-kubernetes/advanced-1/telemetry"
)
with open(telemetry_filepath, "w") as f:
f.write(self._telemetry_id)
else:
if os.path.exists(telemetry_filepath):
os.remove(telemetry_filepath)

def log(self, function_name: str):

if self._telemetry_enabled and self._telemetry_id:
if function_name == "get_online_features":
if self._telemetry_counter["get_online_features"] % 10000 != 0:
self._telemetry_counter["get_online_features"] += 1
return

json = {
"function_name": function_name,
"telemetry_id": self._telemetry_id,
"timestamp": datetime.utcnow(),
"version": get_version(),
"os": sys.platform,
"is_test": os.getenv("FEAST_IS_TELEMETRY_TEST", "False"),
}
try:
requests.post(TELEMETRY_ENDPOINT, json=json)
except Exception:
pass
return


tele = Telemetry()
13 changes: 13 additions & 0 deletions sdk/python/feast/version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import pkg_resources


def get_version():
"""
Returns version information of the Feast Python Package
"""

try:
sdk_version = pkg_resources.get_distribution("feast").version
except pkg_resources.DistributionNotFound:
sdk_version = "unknown"
return sdk_version
Loading