Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port telemetry to FeatureStore API #1446

Merged
merged 8 commits into from
Apr 10, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 6 additions & 39 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import shutil
import uuid
import warnings
from datetime import datetime
from os.path import expanduser, join
from typing import Any, Dict, List, Optional, Union

Expand Down Expand Up @@ -73,7 +72,7 @@
)
from feast.protos.feast.serving.ServingService_pb2_grpc import ServingServiceStub
from feast.registry import Registry
from feast.telemetry import log_usage
from feast.telemetry import tele

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -492,13 +491,7 @@ def apply(
>>> feast_client.apply(entity)
"""

if self._telemetry_enabled:
log_usage(
"apply",
self._telemetry_id,
datetime.utcnow(),
self.version(sdk_only=True),
)
tele.log("apply")
if project is None:
project = self.project

Expand Down Expand Up @@ -612,13 +605,7 @@ def get_entity(self, name: str, project: str = None) -> Entity:
none is found
"""

if self._telemetry_enabled:
log_usage(
"get_entity",
self._telemetry_id,
datetime.utcnow(),
self.version(sdk_only=True),
)
tele.log("get_entity")

if project is None:
project = self.project
Expand Down Expand Up @@ -743,13 +730,7 @@ def get_feature_table(self, name: str, project: str = None) -> FeatureTable:
none is found
"""

if self._telemetry_enabled:
log_usage(
"get_feature_table",
self._telemetry_id,
datetime.utcnow(),
self.version(sdk_only=True),
)
tele.log("get_feature_table")

if project is None:
project = self.project
Expand Down Expand Up @@ -890,13 +871,7 @@ def ingest(
>>> client.ingest(driver_ft, ft_df)
"""

if self._telemetry_enabled:
log_usage(
"ingest",
self._telemetry_id,
datetime.utcnow(),
self.version(sdk_only=True),
)
tele.log("ingest")
if project is None:
project = self.project
if isinstance(feature_table, str):
Expand Down Expand Up @@ -1021,15 +996,7 @@ def get_online_features(
{'sales:daily_transactions': [1.1,1.2], 'sales:customer_id': [0,1]}
"""

if self._telemetry_enabled:
if self._telemetry_counter["get_online_features"] % 1000 == 0:
log_usage(
"get_online_features",
self._telemetry_id,
datetime.utcnow(),
self.version(sdk_only=True),
)
self._telemetry_counter["get_online_features"] += 1
tele.log("get_online_features")
try:
response = self._serving_service.GetOnlineFeaturesV2(
GetOnlineFeaturesRequestV2(
Expand Down
117 changes: 15 additions & 102 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import uuid
from collections import OrderedDict, defaultdict
from datetime import datetime, timedelta
from os.path import expanduser, join
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union

Expand All @@ -40,8 +37,9 @@
RepoConfig,
load_repo_config,
)
from feast.telemetry import log_usage
from feast.telemetry import tele
from feast.type_map import python_value_to_proto_value
from feast.version import get_version


class FeatureStore:
Expand Down Expand Up @@ -78,40 +76,11 @@ def __init__(
registry_path=registry_config.path,
cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds),
)
self._configure_telemetry()

def _configure_telemetry(self):
telemetry_filepath = join(expanduser("~"), ".feast", "telemetry")
self._telemetry_enabled = (
os.getenv("FEAST_TELEMETRY", default="True") == "True"
) # written this way to turn the env var string into a boolean
if self._telemetry_enabled:
self._telemetry_counter = {"get_online_features": 0}
if os.path.exists(telemetry_filepath):
with open(telemetry_filepath, "r") as f:
self._telemetry_id = f.read()
else:
self._telemetry_id = str(uuid.uuid4())
print(
"Feast is an open source project that collects anonymized usage statistics. To opt out or learn more see https://docs.feast.dev/v/master/feast-on-kubernetes/advanced-1/telemetry"
)
with open(telemetry_filepath, "w") as f:
f.write(self._telemetry_id)
else:
if os.path.exists(telemetry_filepath):
os.remove(telemetry_filepath)

def version(self) -> str:
"""
Returns version information of the Feast SDK
"""
import pkg_resources
"""Returns the version of the current Feast SDK/CLI"""

try:
sdk_version = pkg_resources.get_distribution("feast").version
except pkg_resources.DistributionNotFound:
sdk_version = "local build"
return sdk_version
return get_version()

@property
def project(self) -> str:
Expand All @@ -134,13 +103,7 @@ def refresh_registry(self):
greater than 0, then once the cache becomes stale (more time than the TTL has passed), a new cache will be
downloaded synchronously, which may increase latencies if the triggering method is get_online_features()
"""
if self._telemetry_enabled:
log_usage(
"refresh_registry",
self._telemetry_id,
datetime.utcnow(),
self.version(),
)
tele.log("refresh_registry")

registry_config = self.config.get_registry_config()
self._registry = Registry(
Expand All @@ -156,10 +119,7 @@ def list_entities(self) -> List[Entity]:
Returns:
List of entities
"""
if self._telemetry_enabled:
log_usage(
"list_entities", self._telemetry_id, datetime.utcnow(), self.version(),
)
tele.log("list_entities")

return self._registry.list_entities(self.project)

Expand All @@ -170,13 +130,7 @@ def list_feature_views(self) -> List[FeatureView]:
Returns:
List of feature views
"""
if self._telemetry_enabled:
log_usage(
"list_feature_views",
self._telemetry_id,
datetime.utcnow(),
self.version(),
)
tele.log("list_feature_views")

return self._registry.list_feature_views(self.project)

Expand All @@ -191,10 +145,7 @@ def get_entity(self, name: str) -> Entity:
Returns either the specified entity, or raises an exception if
none is found
"""
if self._telemetry_enabled:
log_usage(
"get_entity", self._telemetry_id, datetime.utcnow(), self.version(),
)
tele.log("get_entity")

return self._registry.get_entity(name, self.project)

Expand All @@ -209,13 +160,7 @@ def get_feature_view(self, name: str) -> FeatureView:
Returns either the specified feature view, or raises an exception if
none is found
"""
if self._telemetry_enabled:
log_usage(
"get_feature_view",
self._telemetry_id,
datetime.utcnow(),
self.version(),
)
tele.log("get_feature_view")

return self._registry.get_feature_view(name, self.project)

Expand All @@ -226,13 +171,7 @@ def delete_feature_view(self, name: str):
Args:
name: Name of feature view
"""
if self._telemetry_enabled:
log_usage(
"delete_feature_view",
self._telemetry_id,
datetime.utcnow(),
self.version(),
)
tele.log("delete_feature_view")

return self._registry.delete_feature_view(name, self.project)

Expand Down Expand Up @@ -265,10 +204,7 @@ def apply(self, objects: List[Union[FeatureView, Entity]]):
>>> fs.apply([customer_entity, customer_feature_view])
"""

if self._telemetry_enabled:
log_usage(
"apply", self._telemetry_id, datetime.utcnow(), self.version(),
)
tele.log("apply")

# TODO: Add locking
# TODO: Optimize by only making a single call (read/write)
Expand Down Expand Up @@ -330,13 +266,7 @@ def get_historical_features(
>>> feature_data = job.to_df()
>>> model.fit(feature_data) # insert your modeling framework here.
"""
if self._telemetry_enabled:
log_usage(
"get_historical_features",
self._telemetry_id,
datetime.utcnow(),
self.version(),
)
tele.log("get_historical_features")

all_feature_views = self._registry.list_feature_views(
project=self.config.project
Expand Down Expand Up @@ -373,13 +303,7 @@ def materialize_incremental(
>>> fs = FeatureStore(config=RepoConfig(provider="gcp", registry="gs://my-fs/", project="my_fs_proj"))
>>> fs.materialize_incremental(end_date=datetime.utcnow() - timedelta(minutes=5))
"""
if self._telemetry_enabled:
log_usage(
"materialize_incremental",
self._telemetry_id,
datetime.utcnow(),
self.version(),
)
tele.log("materialize_incremental")

feature_views_to_materialize = []
if feature_views is None:
Expand Down Expand Up @@ -434,10 +358,7 @@ def materialize(
>>> start_date=datetime.utcnow() - timedelta(hours=3), end_date=datetime.utcnow() - timedelta(minutes=10)
>>> )
"""
if self._telemetry_enabled:
log_usage(
"materialize", self._telemetry_id, datetime.utcnow(), self.version(),
)
tele.log("materialize")

feature_views_to_materialize = []
if feature_views is None:
Expand Down Expand Up @@ -528,15 +449,7 @@ def get_online_features(
>>> print(online_response_dict)
{'sales:daily_transactions': [1.1,1.2], 'sales:customer_id': [0,1]}
"""
if self._telemetry_enabled:
if self._telemetry_counter["get_online_features"] % 1000 == 0:
log_usage(
"get_online_features",
self._telemetry_id,
datetime.utcnow(),
self.version(),
)
self._telemetry_counter["get_online_features"] += 1
tele.log("get_online_features")

response = self._get_online_features(
feature_refs=feature_refs,
Expand Down
Loading