diff --git a/.github/workflows/code_standards.yaml b/.github/workflows/code_standards.yaml index 45731e12d7..92ff8effee 100644 --- a/.github/workflows/code_standards.yaml +++ b/.github/workflows/code_standards.yaml @@ -18,6 +18,8 @@ jobs: - uses: actions/checkout@v2 - name: install dependencies run: make install-python-ci-dependencies + - name: compile protos + run: make compile-protos-python - name: lint python run: make lint-python diff --git a/Makefile b/Makefile index 93b9d5240d..0e6aab30ef 100644 --- a/Makefile +++ b/Makefile @@ -1,12 +1,12 @@ -# +# # Copyright 2019 The Feast Authors -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # https://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -73,8 +73,7 @@ format-python: cd ${ROOT_DIR}/sdk/python; black --target-version py37 feast tests lint-python: - # TODO: This mypy test needs to be re-enabled and all failures fixed - #cd ${ROOT_DIR}/sdk/python; mypy feast/ tests/ + cd ${ROOT_DIR}/sdk/python; mypy feast/ tests/ cd ${ROOT_DIR}/sdk/python; flake8 feast/ tests/ cd ${ROOT_DIR}/sdk/python; black --check feast tests @@ -104,7 +103,7 @@ build-push-docker: @$(MAKE) push-core-docker registry=$(REGISTRY) version=$(VERSION) @$(MAKE) push-serving-docker registry=$(REGISTRY) version=$(VERSION) @$(MAKE) push-ci-docker registry=$(REGISTRY) - + build-docker: build-core-docker build-serving-docker build-ci-docker push-core-docker: diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 9eb356034a..77976fa41d 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -15,6 +15,7 @@ import json import logging import sys +from typing import Dict, List import click import pkg_resources @@ -120,28 +121,12 @@ def feature(): pass -def _get_features_labels_dict(label_str: str): - """ - Converts CLI input labels string to dictionary format if provided string is valid. - """ - labels_dict = {} - labels_kv = label_str.split(",") - if label_str == "": - return labels_dict - if len(labels_kv) % 2 == 1: - raise ValueError("Uneven key-value label pairs were entered") - for k, v in zip(labels_kv[0::2], labels_kv[1::2]): - labels_dict[k] = v - return labels_dict - - -def _get_features_entities(entities_str: str): +def _convert_entity_string_to_list(entities_str: str) -> List[str]: """ Converts CLI input entities string to list format if provided string is valid. """ - entities_list = [] if entities_str == "": - return entities_list + return [] return entities_str.split(",") @@ -173,8 +158,8 @@ def feature_list(project: str, entities: str, labels: str): """ feast_client = Client() # type: Client - entities_list = _get_features_entities(entities) - labels_dict = _get_features_labels_dict(labels) + entities_list = _convert_entity_string_to_list(entities) + labels_dict: Dict[str, str] = _get_labels_dict(labels) table = [] for feature_ref, feature in feast_client.list_features_by_ref( @@ -195,11 +180,11 @@ def feature_set(): pass -def _get_labels_dict(label_str: str): +def _get_labels_dict(label_str: str) -> Dict[str, str]: """ Converts CLI input labels string to dictionary format if provided string is valid. """ - labels_dict = {} + labels_dict: Dict[str, str] = {} labels_kv = label_str.split(",") if label_str == "": return labels_dict diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 3a7dfb0b78..f4c9994ae7 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -90,7 +90,7 @@ _logger = logging.getLogger(__name__) -CPU_COUNT = os.cpu_count() # type: int +CPU_COUNT: int = len(os.sched_getaffinity(0)) class Client: @@ -123,9 +123,9 @@ def __init__(self, options: Optional[Dict[str, str]] = None, **kwargs): options = dict() self._config = Config(options={**options, **kwargs}) - self._core_service_stub: CoreServiceStub = None - self._serving_service_stub: ServingServiceStub = None - self._auth_metadata = None + self._core_service_stub: Optional[CoreServiceStub] = None + self._serving_service_stub: Optional[ServingServiceStub] = None + self._auth_metadata: Optional[grpc.AuthMetadataPlugin] = None # Configure Auth Metadata Plugin if auth is enabled if self._config.getboolean(CONFIG_CORE_ENABLE_AUTH_KEY): @@ -475,7 +475,7 @@ def get_feature_set( raise ValueError("No project has been configured.") try: - get_feature_set_response = self._core_service_stub.GetFeatureSet( + get_feature_set_response = self._core_service.GetFeatureSet( GetFeatureSetRequest(project=project, name=name.strip()), metadata=self._get_grpc_metadata(), ) # type: GetFeatureSetResponse @@ -719,9 +719,9 @@ def list_ingest_jobs( ) request = ListIngestionJobsRequest(filter=list_filter) # make list request & unpack response - response = self._core_service_stub.ListIngestionJobs(request) + response = self._core_service_stub.ListIngestionJobs(request) # type: ignore ingest_jobs = [ - IngestJob(proto, self._core_service_stub) for proto in response.jobs + IngestJob(proto, self._core_service_stub) for proto in response.jobs # type: ignore ] return ingest_jobs @@ -737,7 +737,7 @@ def restart_ingest_job(self, job: IngestJob): """ request = RestartIngestionJobRequest(id=job.id) try: - self._core_service_stub.RestartIngestionJob(request) + self._core_service.RestartIngestionJob(request) # type: ignore except grpc.RpcError as e: raise grpc.RpcError(e.details()) @@ -753,7 +753,7 @@ def stop_ingest_job(self, job: IngestJob): """ request = StopIngestionJobRequest(id=job.id) try: - self._core_service_stub.StopIngestionJob(request) + self._core_service.StopIngestionJob(request) # type: ignore except grpc.RpcError as e: raise grpc.RpcError(e.details()) @@ -817,11 +817,12 @@ def ingest( while True: if timeout is not None and time.time() - current_time >= timeout: raise TimeoutError("Timed out waiting for feature set to be ready") - feature_set = self.get_feature_set(name) + fetched_feature_set: Optional[FeatureSet] = self.get_feature_set(name) if ( - feature_set is not None - and feature_set.status == FeatureSetStatus.STATUS_READY + fetched_feature_set is not None + and fetched_feature_set.status == FeatureSetStatus.STATUS_READY ): + feature_set = fetched_feature_set break time.sleep(3) @@ -944,7 +945,7 @@ def get_statistics( if end_date is not None: request.end_date.CopyFrom(Timestamp(seconds=int(end_date.timestamp()))) - return self._core_service_stub.GetFeatureStatistics( + return self._core_service.GetFeatureStatistics( request ).dataset_feature_statistics_list diff --git a/sdk/python/feast/feature_set.py b/sdk/python/feast/feature_set.py index ee06463bc6..848233925a 100644 --- a/sdk/python/feast/feature_set.py +++ b/sdk/python/feast/feature_set.py @@ -22,6 +22,7 @@ from google.protobuf.duration_pb2 import Duration from google.protobuf.json_format import MessageToDict, MessageToJson from google.protobuf.message import Message +from google.protobuf.timestamp_pb2 import Timestamp from pandas.api.types import is_datetime64_ns_dtype from pyarrow.lib import TimestampType @@ -62,7 +63,7 @@ def __init__( self._project = project self._fields = OrderedDict() # type: Dict[str, Field] if features is not None: - self.features = features + self.features: Optional[List[Feature]] = features if entities is not None: self.entities = entities if source is None: @@ -70,12 +71,12 @@ def __init__( else: self._source = source if labels is None: - self._labels = OrderedDict() + self._labels = OrderedDict() # type: MutableMapping[str, str] else: self._labels = labels self._max_age = max_age self._status = None - self._created_timestamp = None + self._created_timestamp: Optional[Timestamp] = None def __eq__(self, other): if not isinstance(other, FeatureSet): @@ -314,7 +315,7 @@ def drop(self, name: str): """ del self._fields[name] - def _add_fields(self, fields: List[Field]): + def _add_fields(self, fields): """ Adds multiple Fields to a Feature Set @@ -379,8 +380,9 @@ def infer_fields_from_df( # Create dictionary of fields that will not be inferred (manually set) provided_fields = OrderedDict() + fields = _create_field_list(entities, features) - for field in entities + features: + for field in fields: if not isinstance(field, Field): raise Exception(f"Invalid field object type provided {type(field)}") if field.name not in provided_fields: @@ -518,8 +520,9 @@ def infer_fields_from_pa( # Create dictionary of fields that will not be inferred (manually set) provided_fields = OrderedDict() + fields = _create_field_list(entities, features) - for field in entities + features: + for field in fields: if not isinstance(field, Field): raise Exception(f"Invalid field object type provided {type(field)}") if field.name not in provided_fields: @@ -835,7 +838,7 @@ def from_proto(cls, feature_set_proto: FeatureSetProto): if len(feature_set_proto.spec.project) == 0 else feature_set_proto.spec.project, ) - feature_set._status = feature_set_proto.meta.status + feature_set._status = feature_set_proto.meta.status # type: ignore feature_set._created_timestamp = feature_set_proto.meta.created_timestamp return feature_set @@ -1041,3 +1044,29 @@ def _infer_pd_column_type(column, series, rows_to_sample): dtype = current_dtype return dtype + + +def _create_field_list(entities: List[Entity], features: List[Feature]) -> List[Field]: + """ + Convert entities and features List to Field List + + Args: + entities: List of Entity Objects + features: List of Features Objects + + + Returns: + List[Field]: + List of field from entities and features combined + """ + fields: List[Field] = [] + + for entity in entities: + if isinstance(entity, Field): + fields.append(entity) + + for feature in features: + if isinstance(feature, Field): + fields.append(feature) + + return fields diff --git a/sdk/python/feast/field.py b/sdk/python/feast/field.py index 6c015588b6..2f54e82d6f 100644 --- a/sdk/python/feast/field.py +++ b/sdk/python/feast/field.py @@ -14,7 +14,7 @@ from collections import OrderedDict from typing import MutableMapping, Optional, Union -from feast.core.FeatureSet_pb2 import EntitySpec, FeatureSpec +from feast.core.FeatureSet_pb2 import FeatureSpec from feast.value_type import ValueType from tensorflow_metadata.proto.v0 import schema_pb2 @@ -36,25 +36,25 @@ def __init__( raise ValueError("dtype is not a valid ValueType") self._dtype = dtype if labels is None: - self._labels = OrderedDict() + self._labels = OrderedDict() # type: MutableMapping else: self._labels = labels - self._presence = None - self._group_presence = None - self._shape = None - self._value_count = None - self._domain = None - self._int_domain = None - self._float_domain = None - self._string_domain = None - self._bool_domain = None - self._struct_domain = None - self._natural_language_domain = None - self._image_domain = None - self._mid_domain = None - self._url_domain = None - self._time_domain = None - self._time_of_day_domain = None + self._presence: Optional[schema_pb2.FeaturePresence] = None + self._group_presence: Optional[schema_pb2.FeaturePresenceWithinGroup] = None + self._shape: Optional[schema_pb2.FixedShape] = None + self._value_count: Optional[schema_pb2.ValueCount] = None + self._domain: Optional[str] = None + self._int_domain: Optional[schema_pb2.IntDomain] = None + self._float_domain: Optional[schema_pb2.FloatDomain] = None + self._string_domain: Optional[schema_pb2.StringDomain] = None + self._bool_domain: Optional[schema_pb2.BoolDomain] = None + self._struct_domain: Optional[schema_pb2.StructDomain] = None + self._natural_language_domain: Optional[schema_pb2.NaturalLanguageDomain] = None + self._image_domain: Optional[schema_pb2.ImageDomain] = None + self._mid_domain: Optional[schema_pb2.MIDDomain] = None + self._url_domain: Optional[schema_pb2.URLDomain] = None + self._time_domain: Optional[schema_pb2.TimeDomain] = None + self._time_of_day_domain: Optional[schema_pb2.TimeOfDayDomain] = None def __eq__(self, other): if ( @@ -87,7 +87,7 @@ def labels(self) -> MutableMapping[str, str]: return self._labels @property - def presence(self) -> schema_pb2.FeaturePresence: + def presence(self) -> Optional[schema_pb2.FeaturePresence]: """ Getter for presence of this field """ @@ -104,7 +104,7 @@ def presence(self, presence: schema_pb2.FeaturePresence): self._presence = presence @property - def group_presence(self) -> schema_pb2.FeaturePresenceWithinGroup: + def group_presence(self) -> Optional[schema_pb2.FeaturePresenceWithinGroup]: """ Getter for group_presence of this field """ @@ -121,7 +121,7 @@ def group_presence(self, group_presence: schema_pb2.FeaturePresenceWithinGroup): self._group_presence = group_presence @property - def shape(self) -> schema_pb2.FixedShape: + def shape(self) -> Optional[schema_pb2.FixedShape]: """ Getter for shape of this field """ @@ -138,7 +138,7 @@ def shape(self, shape: schema_pb2.FixedShape): self._shape = shape @property - def value_count(self) -> schema_pb2.ValueCount: + def value_count(self) -> Optional[schema_pb2.ValueCount]: """ Getter for value_count of this field """ @@ -155,7 +155,7 @@ def value_count(self, value_count: schema_pb2.ValueCount): self._value_count = value_count @property - def domain(self) -> str: + def domain(self) -> Optional[str]: """ Getter for domain of this field """ @@ -172,7 +172,7 @@ def domain(self, domain: str): self._domain = domain @property - def int_domain(self) -> schema_pb2.IntDomain: + def int_domain(self) -> Optional[schema_pb2.IntDomain]: """ Getter for int_domain of this field """ @@ -189,7 +189,7 @@ def int_domain(self, int_domain: schema_pb2.IntDomain): self._int_domain = int_domain @property - def float_domain(self) -> schema_pb2.FloatDomain: + def float_domain(self) -> Optional[schema_pb2.FloatDomain]: """ Getter for float_domain of this field """ @@ -206,7 +206,7 @@ def float_domain(self, float_domain: schema_pb2.FloatDomain): self._float_domain = float_domain @property - def string_domain(self) -> schema_pb2.StringDomain: + def string_domain(self) -> Optional[schema_pb2.StringDomain]: """ Getter for string_domain of this field """ @@ -223,7 +223,7 @@ def string_domain(self, string_domain: schema_pb2.StringDomain): self._string_domain = string_domain @property - def bool_domain(self) -> schema_pb2.BoolDomain: + def bool_domain(self) -> Optional[schema_pb2.BoolDomain]: """ Getter for bool_domain of this field """ @@ -240,7 +240,7 @@ def bool_domain(self, bool_domain: schema_pb2.BoolDomain): self._bool_domain = bool_domain @property - def struct_domain(self) -> schema_pb2.StructDomain: + def struct_domain(self) -> Optional[schema_pb2.StructDomain]: """ Getter for struct_domain of this field """ @@ -257,7 +257,7 @@ def struct_domain(self, struct_domain: schema_pb2.StructDomain): self._struct_domain = struct_domain @property - def natural_language_domain(self) -> schema_pb2.NaturalLanguageDomain: + def natural_language_domain(self) -> Optional[schema_pb2.NaturalLanguageDomain]: """ Getter for natural_language_domain of this field """ @@ -278,7 +278,7 @@ def natural_language_domain( self._natural_language_domain = natural_language_domain @property - def image_domain(self) -> schema_pb2.ImageDomain: + def image_domain(self) -> Optional[schema_pb2.ImageDomain]: """ Getter for image_domain of this field """ @@ -295,7 +295,7 @@ def image_domain(self, image_domain: schema_pb2.ImageDomain): self._image_domain = image_domain @property - def mid_domain(self) -> schema_pb2.MIDDomain: + def mid_domain(self) -> Optional[schema_pb2.MIDDomain]: """ Getter for mid_domain of this field """ @@ -312,7 +312,7 @@ def mid_domain(self, mid_domain: schema_pb2.MIDDomain): self._mid_domain = mid_domain @property - def url_domain(self) -> schema_pb2.URLDomain: + def url_domain(self) -> Optional[schema_pb2.URLDomain]: """ Getter for url_domain of this field """ @@ -329,7 +329,7 @@ def url_domain(self, url_domain: schema_pb2.URLDomain): self.url_domain = url_domain @property - def time_domain(self) -> schema_pb2.TimeDomain: + def time_domain(self) -> Optional[schema_pb2.TimeDomain]: """ Getter for time_domain of this field """ @@ -346,7 +346,7 @@ def time_domain(self, time_domain: schema_pb2.TimeDomain): self._time_domain = time_domain @property - def time_of_day_domain(self) -> schema_pb2.TimeOfDayDomain: + def time_of_day_domain(self) -> Optional[schema_pb2.TimeOfDayDomain]: """ Getter for time_of_day_domain of this field """ @@ -363,14 +363,14 @@ def time_of_day_domain(self, time_of_day_domain): self._time_of_day_domain = time_of_day_domain def update_presence_constraints( - self, feature: Union[schema_pb2.Feature, EntitySpec, FeatureSpec] + self, feature: Union[schema_pb2.Feature, FeatureSpec] ) -> None: """ - Update the presence constraints in this field from Tensorflow Feature, - Feast EntitySpec or FeatureSpec + Update the presence constraints in this field from Tensorflow Feature or + Feast FeatureSpec Args: - feature: Tensorflow Feature, Feast EntitySpec or FeatureSpec + feature: Tensorflow Feature or Feast FeatureSpec Returns: None """ @@ -381,14 +381,14 @@ def update_presence_constraints( self.group_presence = feature.group_presence def update_shape_type( - self, feature: Union[schema_pb2.Feature, EntitySpec, FeatureSpec] + self, feature: Union[schema_pb2.Feature, FeatureSpec] ) -> None: """ - Update the shape type in this field from Tensorflow Feature, - Feast EntitySpec or FeatureSpec + Update the shape type in this field from Tensorflow Feature or + Feast FeatureSpec Args: - feature: Tensorflow Feature, Feast EntitySpec or FeatureSpec + feature: Tensorflow Feature or Feast FeatureSpec Returns: None """ @@ -399,14 +399,13 @@ def update_shape_type( self.value_count = feature.value_count def update_domain_info( - self, feature: Union[schema_pb2.Feature, EntitySpec, FeatureSpec] + self, feature: Union[schema_pb2.Feature, FeatureSpec] ) -> None: """ - Update the domain info in this field from Tensorflow Feature, Feast EntitySpec - or FeatureSpec + Update the domain info in this field from Tensorflow Feature or Feast FeatureSpec Args: - feature: Tensorflow Feature, Feast EntitySpec or FeatureSpec + feature: Tensorflow Feature or Feast FeatureSpec Returns: None """ diff --git a/sdk/python/feast/grpc/auth.py b/sdk/python/feast/grpc/auth.py index 0ec08d3365..f3e50cff4d 100644 --- a/sdk/python/feast/grpc/auth.py +++ b/sdk/python/feast/grpc/auth.py @@ -31,7 +31,7 @@ ) -def get_auth_metadata_plugin(config: Config): +def get_auth_metadata_plugin(config: Config) -> grpc.AuthMetadataPlugin: """ Get an Authentication Metadata Plugin. This plugin is used in gRPC to sign requests. Please see the following URL for more details diff --git a/sdk/python/feast/job.py b/sdk/python/feast/job.py index 0ddd89b7ed..414139e0e5 100644 --- a/sdk/python/feast/job.py +++ b/sdk/python/feast/job.py @@ -178,8 +178,11 @@ def to_chunked_dataframe( pd.DataFrame: Pandas DataFrame of the feature values. """ + + # Object is Avro row type object, refer to self.result function for this type + records: List[dict] = [] + # Max chunk size defined by user - records = [] for result in self.result(timeout_sec=timeout_sec): result.append(records) if len(records) == max_chunk_size: @@ -257,7 +260,7 @@ def external_id(self) -> str: return self.proto.external_id @property - def status(self) -> IngestionJobStatus: + def status(self) -> IngestionJobStatus: # type: ignore """ Getter for IngestJob's status """ @@ -280,13 +283,13 @@ def source(self) -> Source: return Source.from_proto(self.proto.source) @property - def store(self) -> Store: + def stores(self) -> List[Store]: """ Getter for the IngestJob's target feast store. """ - return self.proto.store + return list(self.proto.stores) - def wait(self, status: IngestionJobStatus, timeout_secs: float = 300): + def wait(self, status: IngestionJobStatus, timeout_secs: int = 300): # type: ignore """ Wait for this IngestJob to transtion to the given status. Raises TimeoutError if the wait operation times out. @@ -297,7 +300,7 @@ def wait(self, status: IngestionJobStatus, timeout_secs: float = 300): """ # poll & wait for job status to transition wait_retry_backoff( - retry_fn=(lambda: (None, self.status == status)), + retry_fn=(lambda: (None, self.status == status)), # type: ignore timeout_secs=timeout_secs, timeout_msg="Wait for IngestJob's status to transition timed out", ) diff --git a/sdk/python/feast/loaders/file.py b/sdk/python/feast/loaders/file.py index 52cc8ae7dc..5fffd62ea3 100644 --- a/sdk/python/feast/loaders/file.py +++ b/sdk/python/feast/loaders/file.py @@ -73,7 +73,7 @@ def export_source_to_staging_location( ) elif urlparse(source).scheme in ["", "file"]: # Local file provided as a source - dir_path = None + dir_path = "" file_name = os.path.basename(source) source_path = os.path.abspath( os.path.join(urlparse(source).netloc, urlparse(source).path) @@ -83,7 +83,9 @@ def export_source_to_staging_location( input_source_uri = urlparse(source) if "*" in source: # Wildcard path - return _get_files(bucket=input_source_uri.hostname, uri=input_source_uri) + return _get_files( + bucket=str(input_source_uri.hostname), uri=input_source_uri + ) else: return [source] else: @@ -96,7 +98,7 @@ def export_source_to_staging_location( if uri.scheme == "gs": # Staging location is a Google Cloud Storage path upload_file_to_gcs( - source_path, uri.hostname, str(uri.path).strip("/") + "/" + file_name + source_path, str(uri.hostname), str(uri.path).strip("/") + "/" + file_name ) elif uri.scheme == "file": # Staging location is a file path @@ -109,7 +111,7 @@ def export_source_to_staging_location( ) # Clean up, remove local staging file - if dir_path and isinstance(source, pd.DataFrame) and len(str(dir_path)) > 4: + if isinstance(source, pd.DataFrame) and len(str(dir_path)) > 4: shutil.rmtree(dir_path) return [staging_location_uri.rstrip("/") + "/" + file_name] @@ -180,8 +182,8 @@ def upload_file_to_gcs(local_path: str, bucket: str, remote_path: str) -> None: """ storage_client = storage.Client(project=None) - bucket = storage_client.get_bucket(bucket) - blob = bucket.blob(remote_path) + bucket_storage = storage_client.get_bucket(bucket) + blob = bucket_storage.blob(remote_path) blob.upload_from_filename(local_path) @@ -206,12 +208,12 @@ def _get_files(bucket: str, uri: ParseResult) -> List[str]: """ storage_client = storage.Client(project=None) - bucket = storage_client.get_bucket(bucket) + bucket_storage = storage_client.get_bucket(bucket) path = uri.path if "*" in path: regex = re.compile(path.replace("*", ".*?").strip("/")) - blob_list = bucket.list_blobs( + blob_list = bucket_storage.list_blobs( prefix=path.strip("/").split("*")[0], delimiter="/" ) # File path should not be in path (file path must be longer than path) diff --git a/sdk/python/feast/loaders/ingest.py b/sdk/python/feast/loaders/ingest.py index b439dbd302..7fafc9061a 100644 --- a/sdk/python/feast/loaders/ingest.py +++ b/sdk/python/feast/loaders/ingest.py @@ -74,7 +74,7 @@ def _encode_pa_tables( } # List to store result - feature_rows = [] + feature_rows: List[bytes] = [] # Loop optimization declaration(s) field = FieldProto.Field diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 85def25fcb..56757e9ef9 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -13,7 +13,7 @@ # limitations under the License. from datetime import datetime, timezone -from typing import List +from typing import Any, Dict, List, Union import numpy as np import pandas as pd @@ -422,7 +422,7 @@ def pa_to_value_type(pa_type: object): return type_map[pa_type.__str__()] -def pa_to_feast_value_type(value: object) -> ValueType: +def pa_to_feast_value_type(value: pa.lib.ChunkedArray) -> ValueType: type_map = { "timestamp[ms]": ValueType.INT64, "int32": ValueType.INT32, @@ -443,7 +443,7 @@ def pa_to_feast_value_type(value: object) -> ValueType: return type_map[value.type.__str__()] -def pa_column_to_timestamp_proto_column(column: pa.lib.ChunkedArray) -> Timestamp: +def pa_column_to_timestamp_proto_column(column: pa.lib.ChunkedArray) -> List[Timestamp]: if not isinstance(column.type, TimestampType): raise Exception("Only TimestampType columns are allowed") @@ -456,9 +456,9 @@ def pa_column_to_timestamp_proto_column(column: pa.lib.ChunkedArray) -> Timestam def pa_column_to_proto_column( - feast_value_type, column: pa.lib.ChunkedArray + feast_value_type: ValueType, column: pa.lib.ChunkedArray ) -> List[ProtoValue]: - type_map = { + type_map: Dict[ValueType, Union[str, Dict[str, Any]]] = { ValueType.INT32: "int32_val", ValueType.INT64: "int64_val", ValueType.FLOAT: "float_val", @@ -475,9 +475,9 @@ def pa_column_to_proto_column( ValueType.INT64_LIST: {"int64_list_val": Int64List}, } - value = type_map[feast_value_type] + value: Union[str, Dict[str, Any]] = type_map[feast_value_type] # Process list types - if type(value) == dict: + if isinstance(value, dict): list_param_name = list(value.keys())[0] return [ ProtoValue(**{list_param_name: value[list_param_name](val=x.as_py())}) diff --git a/sdk/python/feast/value_type.py b/sdk/python/feast/value_type.py index 687dccc7b7..aaf3de1822 100644 --- a/sdk/python/feast/value_type.py +++ b/sdk/python/feast/value_type.py @@ -38,7 +38,7 @@ class ValueType(enum.Enum): FLOAT_LIST = 16 BOOL_LIST = 17 - def to_tfx_schema_feature_type(self) -> schema_pb2.FeatureType: + def to_tfx_schema_feature_type(self): if self.value in [ ValueType.BYTES.value, ValueType.STRING.value, diff --git a/sdk/python/feast/wait.py b/sdk/python/feast/wait.py index f25036d76e..c32897606e 100644 --- a/sdk/python/feast/wait.py +++ b/sdk/python/feast/wait.py @@ -21,9 +21,9 @@ def wait_retry_backoff( retry_fn: Callable[[], Tuple[Any, bool]], - timeout_secs: Optional[int] = None, + timeout_secs: int = 0, timeout_msg: Optional[str] = "Timeout while waiting for retry_fn() to return True", - max_interval_secs: Optional[int] = int(defaults[CONFIG_MAX_WAIT_INTERVAL_KEY]), + max_interval_secs: int = int(defaults[CONFIG_MAX_WAIT_INTERVAL_KEY]), ) -> Any: """ Repeatedly try calling given retry_fn until it returns a True boolean success flag. @@ -31,16 +31,16 @@ def wait_retry_backoff( Args: retry_fn: Callable that returns a result and a boolean success flag. timeout_secs: timeout in seconds to give up retrying and throw TimeoutError, - or None to retry perpectually. + or 0 to retry perpetually. timeout_msg: Message to use when throwing TimeoutError. max_interval_secs: max wait in seconds to wait between retries. Returns: Returned Result from retry_fn() if success flag is True. """ - wait_secs, elapsed_secs = 1, 0 + wait_secs, elapsed_secs = 1.0, 0.0 result, is_success = retry_fn() wait_begin = time.time() - while not is_success and elapsed_secs <= timeout_secs: + while not is_success and (elapsed_secs <= timeout_secs or timeout_secs == 0): # back off wait duration exponentially, capped at MAX_WAIT_INTERVAL_SEC elapsed_secs = time.time() - wait_begin till_timeout_secs = timeout_secs - elapsed_secs diff --git a/sdk/python/requirements-ci.txt b/sdk/python/requirements-ci.txt index 45aff4788b..03abbb57c3 100644 --- a/sdk/python/requirements-ci.txt +++ b/sdk/python/requirements-ci.txt @@ -10,4 +10,6 @@ pytest-timeout pytest-ordering==0.6.* pandas==0.* mock==2.0.0 -pandavro==1.5.* \ No newline at end of file +pandavro==1.5.* +mypy +mypy-protobuf diff --git a/sdk/python/tests/feast_serving_server.py b/sdk/python/tests/feast_serving_server.py index 983e74e885..d121d387dc 100644 --- a/sdk/python/tests/feast_serving_server.py +++ b/sdk/python/tests/feast_serving_server.py @@ -4,20 +4,12 @@ from typing import Dict import grpc -from google.protobuf.timestamp_pb2 import Timestamp import feast.serving.ServingService_pb2_grpc as Serving from feast.core import FeatureSet_pb2 as FeatureSetProto from feast.core.CoreService_pb2 import ListFeatureSetsResponse from feast.core.CoreService_pb2_grpc import CoreServiceStub -from feast.serving.ServingService_pb2 import ( - GetFeastServingInfoResponse, - GetOnlineFeaturesRequest, - GetOnlineFeaturesResponse, -) -from feast.types import FeatureRow_pb2 as FeatureRowProto -from feast.types import Field_pb2 as FieldProto -from feast.types import Value_pb2 as ValueProto +from feast.serving.ServingService_pb2 import GetFeastServingInfoResponse _ONE_DAY_IN_SECONDS = 60 * 60 * 24 @@ -61,74 +53,6 @@ def __get_feature_sets_from_core(self): def GetFeastServingVersion(self, request, context): return GetFeastServingInfoResponse(version="0.3.2") - def GetOnlineFeatures(self, request: GetOnlineFeaturesRequest, context): - - response = GetOnlineFeaturesResponse( - feature_data_sets=[ - GetOnlineFeaturesResponse.FeatureDataSet( - name="feature_set_1", - feature_rows=[ - FeatureRowProto.FeatureRow( - feature_set="feature_set_1", - event_timestamp=Timestamp(), - fields=[ - FieldProto.Field( - name="feature_1", - value=ValueProto.Value(float_val=1.2), - ), - FieldProto.Field( - name="feature_2", - value=ValueProto.Value(float_val=1.2), - ), - FieldProto.Field( - name="feature_3", - value=ValueProto.Value(float_val=1.2), - ), - ], - ), - FeatureRowProto.FeatureRow( - feature_set="feature_set_1", - event_timestamp=Timestamp(), - fields=[ - FieldProto.Field( - name="feature_1", - value=ValueProto.Value(float_val=1.2), - ), - FieldProto.Field( - name="feature_2", - value=ValueProto.Value(float_val=1.2), - ), - FieldProto.Field( - name="feature_3", - value=ValueProto.Value(float_val=1.2), - ), - ], - ), - FeatureRowProto.FeatureRow( - feature_set="feature_set_1", - event_timestamp=Timestamp(), - fields=[ - FieldProto.Field( - name="feature_1", - value=ValueProto.Value(float_val=1.2), - ), - FieldProto.Field( - name="feature_2", - value=ValueProto.Value(float_val=1.2), - ), - FieldProto.Field( - name="feature_3", - value=ValueProto.Value(float_val=1.2), - ), - ], - ), - ], - ) - ] - ) - - return response - def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) diff --git a/sdk/python/tests/grpc/test_auth.py b/sdk/python/tests/grpc/test_auth.py index 5045f357a0..90896ee925 100644 --- a/sdk/python/tests/grpc/test_auth.py +++ b/sdk/python/tests/grpc/test_auth.py @@ -17,13 +17,14 @@ from http import HTTPStatus from unittest.mock import call, patch +from pytest import fixture, raises + from feast.config import Config from feast.grpc.auth import ( GoogleOpenIDAuthMetadataPlugin, OAuthMetadataPlugin, get_auth_metadata_plugin, ) -from pytest import fixture, raises AUDIENCE = "https://testaudience.io/" diff --git a/sdk/python/tests/test_client.py b/sdk/python/tests/test_client.py index 9a492c1721..06c61b5bae 100644 --- a/sdk/python/tests/test_client.py +++ b/sdk/python/tests/test_client.py @@ -22,13 +22,14 @@ import grpc import pandas as pd import pandavro +import pytest from google.protobuf.duration_pb2 import Duration +from mock import MagicMock, patch from pytz import timezone import dataframes import feast.core.CoreService_pb2_grpc as Core import feast.serving.ServingService_pb2_grpc as Serving -import pytest from feast.client import Client from feast.core.CoreService_pb2 import ( GetFeastCoreVersionResponse, @@ -71,7 +72,6 @@ DisallowAuthInterceptor, ) from feast_serving_server import ServingServicer -from mock import MagicMock, patch CORE_URL = "core.feast.example.com" SERVING_URL = "serving.example.com" diff --git a/sdk/python/tests/test_config.py b/sdk/python/tests/test_config.py index 8c30f2562c..9ed34a736a 100644 --- a/sdk/python/tests/test_config.py +++ b/sdk/python/tests/test_config.py @@ -16,6 +16,7 @@ from tempfile import mkstemp import pytest + from feast.config import Config diff --git a/sdk/python/tests/test_feature_set.py b/sdk/python/tests/test_feature_set.py index 31689403a6..5f36d8fe69 100644 --- a/sdk/python/tests/test_feature_set.py +++ b/sdk/python/tests/test_feature_set.py @@ -19,12 +19,12 @@ import grpc import pandas as pd +import pytest import pytz from google.protobuf import json_format import dataframes import feast.core.CoreService_pb2_grpc as Core -import pytest from feast.client import Client from feast.entity import Entity from feast.feature_set import (