diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index 41bba6443f..82f6c4ab4a 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -26,6 +26,7 @@ import "feast/core/DataFormat.proto"; import "feast/types/Value.proto"; // Defines a Data Source that can be used source Feature data +// Next available id: 22 message DataSource { // Field indexes should *not* be reused. Not sure if fields 6-10 were used previously or not, // but they are going to be reserved for backwards compatibility. @@ -45,6 +46,13 @@ message DataSource { REQUEST_SOURCE = 7; } + + // Unique name of data source within the project + string name = 20; + + // Name of Feast project that this data source belongs to. + string project = 21; + SourceType type = 1; // Defines mapping between fields in the sourced data @@ -156,9 +164,7 @@ message DataSource { // Defines options for DataSource that sources features from request data message RequestDataOptions { - // Name of the request data source - string name = 1; - + reserved 1; // Mapping of feature name to type map schema = 2; } diff --git a/protos/feast/core/OnDemandFeatureView.proto b/protos/feast/core/OnDemandFeatureView.proto index 58feff5bfd..bd9a289a1b 100644 --- a/protos/feast/core/OnDemandFeatureView.proto +++ b/protos/feast/core/OnDemandFeatureView.proto @@ -48,8 +48,6 @@ message OnDemandFeatureViewSpec { map inputs = 4; UserDefinedFunction user_defined_function = 5; - - } message OnDemandFeatureViewMeta { diff --git a/protos/feast/core/Registry.proto b/protos/feast/core/Registry.proto index 3deeb97238..a19e4c0a09 100644 --- a/protos/feast/core/Registry.proto +++ b/protos/feast/core/Registry.proto @@ -28,13 +28,16 @@ import "feast/core/FeatureView.proto"; import "feast/core/InfraObject.proto"; import "feast/core/OnDemandFeatureView.proto"; import "feast/core/RequestFeatureView.proto"; +import "feast/core/DataSource.proto"; import "feast/core/SavedDataset.proto"; import "google/protobuf/timestamp.proto"; +// Next id: 13 message Registry { repeated Entity entities = 1; repeated FeatureTable feature_tables = 2; repeated FeatureView feature_views = 6; + repeated DataSource data_sources = 12; repeated OnDemandFeatureView on_demand_feature_views = 8; repeated RequestFeatureView request_feature_views = 9; repeated FeatureService feature_services = 7; diff --git a/protos/feast/core/RequestFeatureView.proto b/protos/feast/core/RequestFeatureView.proto index c9ee540e6f..39711d8d3f 100644 --- a/protos/feast/core/RequestFeatureView.proto +++ b/protos/feast/core/RequestFeatureView.proto @@ -22,8 +22,6 @@ option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core"; option java_outer_classname = "RequestFeatureViewProto"; option java_package = "feast.proto.core"; -import "feast/core/FeatureView.proto"; -import "feast/core/Feature.proto"; import "feast/core/DataSource.proto"; message RequestFeatureView { diff --git a/protos/feast/core/SavedDataset.proto b/protos/feast/core/SavedDataset.proto index ebd2e56d35..e6d103a691 100644 --- a/protos/feast/core/SavedDataset.proto +++ b/protos/feast/core/SavedDataset.proto @@ -23,8 +23,8 @@ option java_outer_classname = "SavedDatasetProto"; option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core"; import "google/protobuf/timestamp.proto"; -import "feast/core/FeatureViewProjection.proto"; import "feast/core/DataSource.proto"; +import "feast/core/FeatureService.proto"; message SavedDatasetSpec { // Name of the dataset. Must be unique since it's possible to overwrite dataset by name @@ -44,6 +44,9 @@ message SavedDatasetSpec { SavedDatasetStorage storage = 6; + // Optional and only populated if generated from a feature service fetch + string feature_service_name = 8; + // User defined metadata map tags = 7; } diff --git a/protos/feast/core/ValidationProfile.proto b/protos/feast/core/ValidationProfile.proto index 31c4e150a0..502e79f72b 100644 --- a/protos/feast/core/ValidationProfile.proto +++ b/protos/feast/core/ValidationProfile.proto @@ -22,7 +22,6 @@ option java_package = "feast.proto.core"; option java_outer_classname = "ValidationProfile"; option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core"; -import "google/protobuf/timestamp.proto"; import "feast/core/SavedDataset.proto"; message GEValidationProfiler { diff --git a/protos/feast/storage/Redis.proto b/protos/feast/storage/Redis.proto index a662e352f4..bc9d0c61d3 100644 --- a/protos/feast/storage/Redis.proto +++ b/protos/feast/storage/Redis.proto @@ -16,7 +16,6 @@ syntax = "proto3"; -import "feast/types/Field.proto"; import "feast/types/Value.proto"; package feast.storage; diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index febac56fcc..d2a71bc561 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -126,6 +126,56 @@ def endpoint(ctx: click.Context): _logger.info("There is no active feature server.") +@cli.group(name="data-sources") +def data_sources_cmd(): + """ + Access data sources + """ + pass + + +@data_sources_cmd.command("describe") +@click.argument("name", type=click.STRING) +@click.pass_context +def data_source_describe(ctx: click.Context, name: str): + """ + Describe a data source + """ + repo = ctx.obj["CHDIR"] + cli_check_repo(repo) + store = FeatureStore(repo_path=str(repo)) + + try: + data_source = store.get_data_source(name) + except FeastObjectNotFoundException as e: + print(e) + exit(1) + + print( + yaml.dump( + yaml.safe_load(str(data_source)), default_flow_style=False, sort_keys=False + ) + ) + + +@data_sources_cmd.command(name="list") +@click.pass_context +def data_source_list(ctx: click.Context): + """ + List all data sources + """ + repo = ctx.obj["CHDIR"] + cli_check_repo(repo) + store = FeatureStore(repo_path=str(repo)) + table = [] + for datasource in store.list_data_sources(): + table.append([datasource.name, datasource.__class__]) + + from tabulate import tabulate + + print(tabulate(table, headers=["NAME", "CLASS"], tablefmt="plain")) + + @cli.group(name="entities") def entities_cmd(): """ diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 360b9b8542..15ce0c2377 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -139,6 +139,7 @@ class DataSource(ABC): DataSource that can be used to source features. Args: + name: Name of data source, which should be unique within a project event_timestamp_column (optional): Event timestamp column used for point in time joins of feature values. created_timestamp_column (optional): Timestamp column indicating when the row @@ -149,6 +150,7 @@ class DataSource(ABC): date_partition_column (optional): Timestamp column used for partitioning. """ + name: str event_timestamp_column: str created_timestamp_column: str field_mapping: Dict[str, str] @@ -156,12 +158,14 @@ class DataSource(ABC): def __init__( self, + name: str, event_timestamp_column: Optional[str] = None, created_timestamp_column: Optional[str] = None, field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = None, ): """Creates a DataSource object.""" + self.name = name self.event_timestamp_column = ( event_timestamp_column if event_timestamp_column else "" ) @@ -173,12 +177,16 @@ def __init__( date_partition_column if date_partition_column else "" ) + def __hash__(self): + return hash((id(self), self.name)) + def __eq__(self, other): if not isinstance(other, DataSource): raise TypeError("Comparisons should only involve DataSource class objects.") if ( - self.event_timestamp_column != other.event_timestamp_column + self.name != other.name + or self.event_timestamp_column != other.event_timestamp_column or self.created_timestamp_column != other.created_timestamp_column or self.field_mapping != other.field_mapping or self.date_partition_column != other.date_partition_column @@ -206,7 +214,9 @@ def from_proto(data_source: DataSourceProto) -> Any: cls = get_data_source_class_from_type(data_source.data_source_class_type) return cls.from_proto(data_source) - if data_source.file_options.file_format and data_source.file_options.file_url: + if data_source.request_data_options and data_source.request_data_options.schema: + data_source_obj = RequestDataSource.from_proto(data_source) + elif data_source.file_options.file_format and data_source.file_options.file_url: from feast.infra.offline_stores.file_source import FileSource data_source_obj = FileSource.from_proto(data_source) @@ -246,7 +256,7 @@ def from_proto(data_source: DataSourceProto) -> Any: @abstractmethod def to_proto(self) -> DataSourceProto: """ - Converts an DataSourceProto object to its protobuf representation. + Converts a DataSourceProto object to its protobuf representation. """ raise NotImplementedError @@ -296,6 +306,7 @@ def get_table_column_names_and_types( def __init__( self, + name: str, event_timestamp_column: str, bootstrap_servers: str, message_format: StreamFormat, @@ -305,6 +316,7 @@ def __init__( date_partition_column: Optional[str] = "", ): super().__init__( + name, event_timestamp_column, created_timestamp_column, field_mapping, @@ -335,6 +347,7 @@ def __eq__(self, other): @staticmethod def from_proto(data_source: DataSourceProto): return KafkaSource( + name=data_source.name, field_mapping=dict(data_source.field_mapping), bootstrap_servers=data_source.kafka_options.bootstrap_servers, message_format=StreamFormat.from_proto( @@ -348,6 +361,7 @@ def from_proto(data_source: DataSourceProto): def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto( + name=self.name, type=DataSourceProto.STREAM_KAFKA, field_mapping=self.field_mapping, kafka_options=self.kafka_options.to_proto(), @@ -363,6 +377,9 @@ def to_proto(self) -> DataSourceProto: def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: return type_map.redshift_to_feast_value_type + def get_table_query_string(self) -> str: + raise NotImplementedError + class RequestDataSource(DataSource): """ @@ -373,10 +390,6 @@ class RequestDataSource(DataSource): schema: Schema mapping from the input feature name to a ValueType """ - @staticmethod - def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: - raise NotImplementedError - name: str schema: Dict[str, ValueType] @@ -384,8 +397,7 @@ def __init__( self, name: str, schema: Dict[str, ValueType], ): """Creates a RequestDataSource object.""" - super().__init__() - self.name = name + super().__init__(name) self.schema = schema def validate(self, config: RepoConfig): @@ -402,21 +414,28 @@ def from_proto(data_source: DataSourceProto): schema = {} for key in schema_pb.keys(): schema[key] = ValueType(schema_pb.get(key)) - return RequestDataSource( - name=data_source.request_data_options.name, schema=schema - ) + return RequestDataSource(name=data_source.name, schema=schema) def to_proto(self) -> DataSourceProto: schema_pb = {} for key, value in self.schema.items(): schema_pb[key] = value.value - options = DataSourceProto.RequestDataOptions(name=self.name, schema=schema_pb) + options = DataSourceProto.RequestDataOptions(schema=schema_pb) data_source_proto = DataSourceProto( - type=DataSourceProto.REQUEST_SOURCE, request_data_options=options + name=self.name, + type=DataSourceProto.REQUEST_SOURCE, + request_data_options=options, ) return data_source_proto + def get_table_query_string(self) -> str: + raise NotImplementedError + + @staticmethod + def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: + raise NotImplementedError + class KinesisSource(DataSource): def validate(self, config: RepoConfig): @@ -430,6 +449,7 @@ def get_table_column_names_and_types( @staticmethod def from_proto(data_source: DataSourceProto): return KinesisSource( + name=data_source.name, field_mapping=dict(data_source.field_mapping), record_format=StreamFormat.from_proto( data_source.kinesis_options.record_format @@ -445,8 +465,12 @@ def from_proto(data_source: DataSourceProto): def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: pass + def get_table_query_string(self) -> str: + raise NotImplementedError + def __init__( self, + name: str, event_timestamp_column: str, created_timestamp_column: str, record_format: StreamFormat, @@ -456,6 +480,7 @@ def __init__( date_partition_column: Optional[str] = "", ): super().__init__( + name, event_timestamp_column, created_timestamp_column, field_mapping, @@ -475,7 +500,8 @@ def __eq__(self, other): ) if ( - self.kinesis_options.record_format != other.kinesis_options.record_format + self.name != other.name + or self.kinesis_options.record_format != other.kinesis_options.record_format or self.kinesis_options.region != other.kinesis_options.region or self.kinesis_options.stream_name != other.kinesis_options.stream_name ): @@ -485,6 +511,7 @@ def __eq__(self, other): def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto( + name=self.name, type=DataSourceProto.STREAM_KINESIS, field_mapping=self.field_mapping, kinesis_options=self.kinesis_options.to_proto(), diff --git a/sdk/python/feast/diff/registry_diff.py b/sdk/python/feast/diff/registry_diff.py index 9b67dead2c..4558a149a5 100644 --- a/sdk/python/feast/diff/registry_diff.py +++ b/sdk/python/feast/diff/registry_diff.py @@ -2,11 +2,13 @@ from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, TypeVar, cast from feast.base_feature_view import BaseFeatureView +from feast.data_source import DataSource from feast.diff.property_diff import PropertyDiff, TransitionType from feast.entity import Entity -from feast.feast_object import FeastObject +from feast.feast_object import FeastObject, FeastObjectSpecProto from feast.feature_service import FeatureService from feast.feature_view import DUMMY_ENTITY_NAME +from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto from feast.protos.feast.core.FeatureService_pb2 import ( FeatureService as FeatureServiceProto, @@ -89,6 +91,7 @@ def tag_objects_for_keep_delete_update_add( FeastObjectProto = TypeVar( "FeastObjectProto", + DataSourceProto, EntityProto, FeatureViewProto, FeatureServiceProto, @@ -108,23 +111,33 @@ def diff_registry_objects( assert current_proto.DESCRIPTOR.full_name == new_proto.DESCRIPTOR.full_name property_diffs = [] transition: TransitionType = TransitionType.UNCHANGED - if current_proto.spec != new_proto.spec: - for _field in current_proto.spec.DESCRIPTOR.fields: + + current_spec: FeastObjectSpecProto + new_spec: FeastObjectSpecProto + if isinstance(current_proto, DataSourceProto) or isinstance( + new_proto, DataSourceProto + ): + assert type(current_proto) == type(new_proto) + current_spec = cast(DataSourceProto, current_proto) + new_spec = cast(DataSourceProto, new_proto) + else: + current_spec = current_proto.spec + new_spec = new_proto.spec + if current_spec != new_spec: + for _field in current_spec.DESCRIPTOR.fields: if _field.name in FIELDS_TO_IGNORE: continue - if getattr(current_proto.spec, _field.name) != getattr( - new_proto.spec, _field.name - ): + if getattr(current_spec, _field.name) != getattr(new_spec, _field.name): transition = TransitionType.UPDATE property_diffs.append( PropertyDiff( _field.name, - getattr(current_proto.spec, _field.name), - getattr(new_proto.spec, _field.name), + getattr(current_spec, _field.name), + getattr(new_spec, _field.name), ) ) return FeastObjectDiff( - name=new_proto.spec.name, + name=new_spec.name, feast_object_type=object_type, current_feast_object=current, new_feast_object=new, @@ -281,6 +294,12 @@ def apply_diff_to_registry( TransitionType.CREATE, TransitionType.UPDATE, ]: + if feast_object_diff.feast_object_type == FeastObjectType.DATA_SOURCE: + registry.apply_data_source( + cast(DataSource, feast_object_diff.new_feast_object), + project, + commit=False, + ) if feast_object_diff.feast_object_type == FeastObjectType.ENTITY: registry.apply_entity( cast(Entity, feast_object_diff.new_feast_object), diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 17147f8a60..942d3e9c16 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -10,6 +10,13 @@ def __init__(self, path): ) +class DataSourceNoNameException(Exception): + def __init__(self): + super().__init__( + "Unable to infer a name for this data source. Either table or name must be specified." + ) + + class FeastObjectNotFoundException(Exception): pass @@ -64,6 +71,14 @@ def __init__(self, feature_names): ) +class DataSourceObjectNotFoundException(FeastObjectNotFoundException): + def __init__(self, name, project=None): + if project: + super().__init__(f"Data source {name} does not exist in project {project}") + else: + super().__init__(f"Data source {name} does not exist") + + class S3RegistryBucketNotExist(FeastObjectNotFoundException): def __init__(self, bucket): super().__init__(f"S3 bucket {bucket} for the Feast registry does not exist") diff --git a/sdk/python/feast/feast_object.py b/sdk/python/feast/feast_object.py index a9e01c9be4..4ffd693c44 100644 --- a/sdk/python/feast/feast_object.py +++ b/sdk/python/feast/feast_object.py @@ -1,12 +1,33 @@ from typing import Union +from .data_source import DataSource from .entity import Entity from .feature_service import FeatureService from .feature_view import FeatureView from .on_demand_feature_view import OnDemandFeatureView +from .protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from .protos.feast.core.Entity_pb2 import EntitySpecV2 +from .protos.feast.core.FeatureService_pb2 import FeatureServiceSpec +from .protos.feast.core.FeatureView_pb2 import FeatureViewSpec +from .protos.feast.core.OnDemandFeatureView_pb2 import OnDemandFeatureViewSpec +from .protos.feast.core.RequestFeatureView_pb2 import RequestFeatureViewSpec from .request_feature_view import RequestFeatureView # Convenience type representing all Feast objects FeastObject = Union[ - FeatureView, OnDemandFeatureView, RequestFeatureView, Entity, FeatureService + FeatureView, + OnDemandFeatureView, + RequestFeatureView, + Entity, + FeatureService, + DataSource, +] + +FeastObjectSpecProto = Union[ + FeatureViewSpec, + OnDemandFeatureViewSpec, + RequestFeatureViewSpec, + EntitySpecV2, + FeatureServiceSpec, + DataSourceProto, ] diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index ed944804c5..89e4df1d5f 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -39,8 +39,10 @@ from feast import feature_server, flags, flags_helper, utils from feast.base_feature_view import BaseFeatureView +from feast.data_source import DataSource from feast.diff.infra_diff import InfraDiff, diff_infra_protos from feast.diff.registry_diff import RegistryDiff, apply_diff_to_registry, diff_between +from feast.dqm.profilers.ge_profiler import GEProfiler from feast.entity import Entity from feast.errors import ( EntityNotFoundException, @@ -260,6 +262,19 @@ def list_on_demand_feature_views( self.project, allow_cache=allow_cache ) + @log_exceptions_and_usage + def list_data_sources(self, allow_cache: bool = False) -> List[DataSource]: + """ + Retrieves the list of data sources from the registry. + + Args: + allow_cache: Whether to allow returning data sources from a cached registry. + + Returns: + A list of data sources. + """ + return self._registry.list_data_sources(self.project, allow_cache=allow_cache) + @log_exceptions_and_usage def get_entity(self, name: str) -> Entity: """ @@ -285,6 +300,7 @@ def get_feature_service( Args: name: Name of feature service. + allow_cache: Whether to allow returning feature services from a cached registry. Returns: The specified feature service. @@ -334,6 +350,22 @@ def get_on_demand_feature_view(self, name: str) -> OnDemandFeatureView: """ return self._registry.get_on_demand_feature_view(name, self.project) + @log_exceptions_and_usage + def get_data_source(self, name: str) -> DataSource: + """ + Retrieves the list of data sources from the registry. + + Args: + name: Name of the data source. + + Returns: + The specified data source. + + Raises: + DataSourceObjectNotFoundException: The data source could not be found. + """ + return self._registry.get_data_source(name, self.project) + @log_exceptions_and_usage def delete_feature_view(self, name: str): """ @@ -419,6 +451,7 @@ def _validate_all_feature_views( def _make_inferences( self, + data_sources_to_update: List[DataSource], entities_to_update: List[Entity], views_to_update: List[FeatureView], odfvs_to_update: List[OnDemandFeatureView], @@ -428,6 +461,10 @@ def _make_inferences( entities_to_update, views_to_update, self.config ) + update_data_sources_with_inferred_event_timestamp_col( + data_sources_to_update, self.config + ) + update_data_sources_with_inferred_event_timestamp_col( [view.batch_source for view in views_to_update], self.config ) @@ -479,7 +516,13 @@ def _plan( ... ttl=timedelta(seconds=86400 * 1), ... batch_source=driver_hourly_stats, ... ) - >>> registry_diff, infra_diff, new_infra = fs._plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view + >>> registry_diff, infra_diff, new_infra = fs._plan(RepoContents( + ... data_sources={driver_hourly_stats}, + ... feature_views={driver_hourly_stats_view}, + ... on_demand_feature_views=set(), + ... request_feature_views=set(), + ... entities={driver}, + ... feature_services=set())) # register entity and feature view """ # Validate and run inference on all the objects to be registered. self._validate_all_feature_views( @@ -487,7 +530,9 @@ def _plan( list(desired_repo_contents.on_demand_feature_views), list(desired_repo_contents.request_feature_views), ) + _validate_data_sources(list(desired_repo_contents.data_sources)) self._make_inferences( + list(desired_repo_contents.data_sources), list(desired_repo_contents.entities), list(desired_repo_contents.feature_views), list(desired_repo_contents.on_demand_feature_views), @@ -529,12 +574,14 @@ def _apply_diffs( apply_diff_to_registry( self._registry, registry_diff, self.project, commit=False ) + self._registry.update_infra(new_infra, self.project, commit=True) @log_exceptions_and_usage def apply( self, objects: Union[ + DataSource, Entity, FeatureView, OnDemandFeatureView, @@ -598,22 +645,31 @@ def apply( ] odfvs_to_update = [ob for ob in objects if isinstance(ob, OnDemandFeatureView)] services_to_update = [ob for ob in objects if isinstance(ob, FeatureService)] + data_sources_to_update = [ob for ob in objects if isinstance(ob, DataSource)] if len(entities_to_update) + len(views_to_update) + len( request_views_to_update - ) + len(odfvs_to_update) + len(services_to_update) != len(objects): + ) + len(odfvs_to_update) + len(services_to_update) + len( + data_sources_to_update + ) != len( + objects + ): raise ValueError("Unknown object type provided as part of apply() call") # Validate all feature views and make inferences. self._validate_all_feature_views( views_to_update, odfvs_to_update, request_views_to_update ) - self._make_inferences(entities_to_update, views_to_update, odfvs_to_update) + self._make_inferences( + data_sources_to_update, entities_to_update, views_to_update, odfvs_to_update + ) # Handle all entityless feature views by using DUMMY_ENTITY as a placeholder entity. entities_to_update.append(DUMMY_ENTITY) # Add all objects to the registry and update the provider's infrastructure. + for ds in data_sources_to_update: + self._registry.apply_data_source(ds, project=self.project, commit=False) for view in itertools.chain( views_to_update, odfvs_to_update, request_views_to_update ): @@ -642,7 +698,14 @@ def apply( services_to_delete = [ ob for ob in objects_to_delete if isinstance(ob, FeatureService) ] + data_sources_to_delete = [ + ob for ob in objects_to_delete if isinstance(ob, DataSource) + ] + for data_source in data_sources_to_delete: + self._registry.delete_data_source( + data_source.name, project=self.project, commit=False + ) for entity in entities_to_delete: self._registry.delete_entity( entity.name, project=self.project, commit=False @@ -817,6 +880,8 @@ def create_saved_dataset( name: str, storage: SavedDatasetStorage, tags: Optional[Dict[str, str]] = None, + feature_service: Optional[FeatureService] = None, + profiler: Optional[GEProfiler] = None, ) -> SavedDataset: """ Execute provided retrieval job and persist its outcome in given storage. @@ -851,6 +916,7 @@ def create_saved_dataset( full_feature_names=from_.full_feature_names, storage=storage, tags=tags, + feature_service_name=feature_service.name if feature_service else None, ) dataset.min_event_timestamp = from_.metadata.min_event_timestamp @@ -858,14 +924,15 @@ def create_saved_dataset( from_.persist(storage) - self._registry.apply_saved_dataset(dataset, self.project, commit=True) - - return dataset.with_retrieval_job( + dataset = dataset.with_retrieval_job( self._get_provider().retrieve_saved_dataset( config=self.config, dataset=dataset ) ) + self._registry.apply_saved_dataset(dataset, self.project, commit=True) + return dataset + @log_exceptions_and_usage def get_saved_dataset(self, name: str) -> SavedDataset: """ @@ -986,7 +1053,7 @@ def tqdm_builder(length): ) self._registry.apply_materialization( - feature_view, self.project, start_date, end_date + feature_view, self.project, start_date, end_date, ) @log_exceptions_and_usage @@ -1073,7 +1140,7 @@ def tqdm_builder(length): ) self._registry.apply_materialization( - feature_view, self.project, start_date, end_date + feature_view, self.project, start_date, end_date, ) @log_exceptions_and_usage @@ -1901,3 +1968,18 @@ def _validate_feature_views(feature_views: List[BaseFeatureView]): ) else: fv_names.add(case_insensitive_fv_name) + + +def _validate_data_sources(data_sources: List[DataSource]): + """ Verify data sources have case-insensitively unique names""" + ds_names = set() + for fv in data_sources: + case_insensitive_ds_name = fv.name.lower() + if case_insensitive_ds_name in ds_names: + raise ValueError( + f"More than one data source with name {case_insensitive_ds_name} found. " + f"Please ensure that all data source names are case-insensitively unique. " + f"It may be necessary to ignore certain files in your feature repository by using a .feastignore file." + ) + else: + ds_names.add(case_insensitive_ds_name) diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 3fc6f054f1..d233631d3d 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -10,7 +10,7 @@ SnowflakeSource, SparkSource, ) -from feast.data_source import DataSource +from feast.data_source import DataSource, RequestDataSource from feast.errors import RegistryInferenceFailure from feast.feature_view import FeatureView from feast.repo_config import RepoConfig @@ -79,6 +79,8 @@ def update_data_sources_with_inferred_event_timestamp_col( ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column" for data_source in data_sources: + if isinstance(data_source, RequestDataSource): + continue if ( data_source.event_timestamp_column is None or data_source.event_timestamp_column == "" @@ -98,9 +100,10 @@ def update_data_sources_with_inferred_event_timestamp_col( else: raise RegistryInferenceFailure( "DataSource", - """ + f""" DataSource inferencing of event_timestamp_column is currently only supported for FileSource, SparkSource, BigQuerySource, RedshiftSource, and SnowflakeSource. + Attempting to infer from {data_source}. """, ) # for informing the type checker diff --git a/sdk/python/feast/infra/offline_stores/bigquery_source.py b/sdk/python/feast/infra/offline_stores/bigquery_source.py index 4c4d2a591c..6c5be2b5f4 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery_source.py +++ b/sdk/python/feast/infra/offline_stores/bigquery_source.py @@ -1,8 +1,9 @@ +import warnings from typing import Callable, Dict, Iterable, List, Optional, Tuple from feast import type_map from feast.data_source import DataSource -from feast.errors import DataSourceNotFoundException +from feast.errors import DataSourceNoNameException, DataSourceNotFoundException from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto from feast.protos.feast.core.SavedDataset_pb2 import ( SavedDatasetStorage as SavedDatasetStorageProto, @@ -15,22 +16,67 @@ class BigQuerySource(DataSource): def __init__( self, + name: Optional[str] = None, event_timestamp_column: Optional[str] = "", + table: Optional[str] = None, table_ref: Optional[str] = None, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", query: Optional[str] = None, ): - self.bigquery_options = BigQueryOptions(table_ref=table_ref, query=query) + """Create a BigQuerySource from an existing table or query. + + Args: + name (optional): Name for the source. Defaults to the table_ref if not specified. + table (optional): The BigQuery table where features can be found. + table_ref (optional): (Deprecated) The BigQuery table where features can be found. + event_timestamp_column: Event timestamp column used for point in time joins of feature values. + created_timestamp_column (optional): Timestamp column when row was created, used for deduplicating rows. + field_mapping: A dictionary mapping of column names in this data source to feature names in a feature table + or view. Only used for feature columns, not entities or timestamp columns. + date_partition_column (optional): Timestamp column used for partitioning. + query (optional): SQL query to execute to generate data for this data source. + + Example: + >>> from feast import BigQuerySource + >>> my_bigquery_source = BigQuerySource(table="gcp_project:bq_dataset.bq_table") + """ + if table is None and table_ref is None and query is None: + raise ValueError('No "table" argument provided.') + if not table and table_ref: + warnings.warn( + ( + "The argument 'table_ref' is being deprecated. Please use 'table' " + "instead. Feast 0.20 and onwards will not support the argument 'table_ref'." + ), + DeprecationWarning, + ) + table = table_ref + self.bigquery_options = BigQueryOptions(table_ref=table, query=query) + + # If no name, use the table_ref as the default name + _name = name + if not _name: + if table: + _name = table + elif table_ref: + _name = table_ref + else: + raise DataSourceNoNameException() super().__init__( + _name if _name else "", event_timestamp_column, created_timestamp_column, field_mapping, date_partition_column, ) + # Note: Python requires redefining hash in child classes that override __eq__ + def __hash__(self): + return super().__hash__() + def __eq__(self, other): if not isinstance(other, BigQuerySource): raise TypeError( @@ -38,7 +84,8 @@ def __eq__(self, other): ) return ( - self.bigquery_options.table_ref == other.bigquery_options.table_ref + self.name == other.name + and self.bigquery_options.table_ref == other.bigquery_options.table_ref and self.bigquery_options.query == other.bigquery_options.query and self.event_timestamp_column == other.event_timestamp_column and self.created_timestamp_column == other.created_timestamp_column @@ -59,6 +106,7 @@ def from_proto(data_source: DataSourceProto): assert data_source.HasField("bigquery_options") return BigQuerySource( + name=data_source.name, field_mapping=dict(data_source.field_mapping), table_ref=data_source.bigquery_options.table_ref, event_timestamp_column=data_source.event_timestamp_column, @@ -69,6 +117,7 @@ def from_proto(data_source: DataSourceProto): def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto( + name=self.name, type=DataSourceProto.BATCH_BIGQUERY, field_mapping=self.field_mapping, bigquery_options=self.bigquery_options.to_proto(), @@ -132,7 +181,9 @@ class BigQueryOptions: DataSource BigQuery options used to source features from BigQuery query """ - def __init__(self, table_ref: Optional[str], query: Optional[str]): + def __init__( + self, table_ref: Optional[str], query: Optional[str], + ): self._table_ref = table_ref self._query = query diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py index 50e365a631..3ffdf6eda0 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py @@ -8,6 +8,7 @@ from pyspark.sql import SparkSession from feast.data_source import DataSource +from feast.errors import DataSourceNoNameException from feast.infra.offline_stores.offline_utils import get_temp_entity_table_name from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto from feast.protos.feast.core.SavedDataset_pb2 import ( @@ -30,6 +31,7 @@ class SparkSourceFormat(Enum): class SparkSource(DataSource): def __init__( self, + name: Optional[str] = None, table: Optional[str] = None, query: Optional[str] = None, path: Optional[str] = None, @@ -39,7 +41,15 @@ def __init__( field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = None, ): + # If no name, use the table_ref as the default name + _name = name + if not _name: + if table: + _name = table + else: + raise DataSourceNoNameException() super().__init__( + _name, event_timestamp_column, created_timestamp_column, field_mapping, @@ -106,6 +116,7 @@ def from_proto(data_source: DataSourceProto) -> Any: spark_options = SparkOptions.from_proto(data_source.custom_options) return SparkSource( + name=data_source.name, field_mapping=dict(data_source.field_mapping), table=spark_options.table, query=spark_options.query, @@ -118,6 +129,7 @@ def from_proto(data_source: DataSourceProto) -> Any: def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto( + name=self.name, type=DataSourceProto.CUSTOM_SOURCE, field_mapping=self.field_mapping, custom_options=self.spark_options.to_proto(), diff --git a/sdk/python/feast/infra/offline_stores/file_source.py b/sdk/python/feast/infra/offline_stores/file_source.py index 1c1431043c..59e703dd6f 100644 --- a/sdk/python/feast/infra/offline_stores/file_source.py +++ b/sdk/python/feast/infra/offline_stores/file_source.py @@ -19,10 +19,10 @@ class FileSource(DataSource): def __init__( self, + path: str, + name: Optional[str] = "", event_timestamp_column: Optional[str] = "", - file_url: Optional[str] = None, - path: Optional[str] = None, - file_format: FileFormat = None, + file_format: Optional[FileFormat] = None, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", @@ -32,52 +32,50 @@ def __init__( Args: + name (optional): Name for the file source. Defaults to the path. path: File path to file containing feature data. Must contain an event_timestamp column, entity columns and feature columns. event_timestamp_column: Event timestamp column used for point in time joins of feature values. created_timestamp_column (optional): Timestamp column when row was created, used for deduplicating rows. - file_url: [Deprecated] Please see path file_format (optional): Explicitly set the file format. Allows Feast to bypass inferring the file format. field_mapping: A dictionary mapping of column names in this data source to feature names in a feature table or view. Only used for feature columns, not entities or timestamp columns. + date_partition_column (optional): Timestamp column used for partitioning. s3_endpoint_override (optional): Overrides AWS S3 enpoint with custom S3 storage Examples: >>> from feast import FileSource >>> file_source = FileSource(path="my_features.parquet", event_timestamp_column="event_timestamp") """ - if path is None and file_url is None: + if path is None: raise ValueError( 'No "path" argument provided. Please set "path" to the location of your file source.' ) - if file_url: - from warnings import warn - - warn( - 'Argument "file_url" is being deprecated. Please use the "path" argument.' - ) - else: - file_url = path - self.file_options = FileOptions( file_format=file_format, - file_url=file_url, + file_url=path, s3_endpoint_override=s3_endpoint_override, ) super().__init__( + name if name else path, event_timestamp_column, created_timestamp_column, field_mapping, date_partition_column, ) + # Note: Python requires redefining hash in child classes that override __eq__ + def __hash__(self): + return super().__hash__() + def __eq__(self, other): if not isinstance(other, FileSource): raise TypeError("Comparisons should only involve FileSource class objects.") return ( - self.file_options.file_url == other.file_options.file_url + self.name == other.name + and self.file_options.file_url == other.file_options.file_url and self.file_options.file_format == other.file_options.file_format and self.event_timestamp_column == other.event_timestamp_column and self.created_timestamp_column == other.created_timestamp_column @@ -96,6 +94,7 @@ def path(self): @staticmethod def from_proto(data_source: DataSourceProto): return FileSource( + name=data_source.name, field_mapping=dict(data_source.field_mapping), file_format=FileFormat.from_proto(data_source.file_options.file_format), path=data_source.file_options.file_url, @@ -107,6 +106,7 @@ def from_proto(data_source: DataSourceProto): def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto( + name=self.name, type=DataSourceProto.BATCH_FILE, field_mapping=self.field_mapping, file_options=self.file_options.to_proto(), @@ -149,6 +149,9 @@ def create_filesystem_and_path( else: return None, path + def get_table_query_string(self) -> str: + pass + class FileOptions: """ diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index 398374b6a8..19b88544a0 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -2,7 +2,11 @@ from feast import type_map from feast.data_source import DataSource -from feast.errors import DataSourceNotFoundException, RedshiftCredentialsError +from feast.errors import ( + DataSourceNoNameException, + DataSourceNotFoundException, + RedshiftCredentialsError, +) from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto from feast.protos.feast.core.SavedDataset_pb2 import ( SavedDatasetStorage as SavedDatasetStorageProto, @@ -15,6 +19,7 @@ class RedshiftSource(DataSource): def __init__( self, + name: Optional[str] = None, event_timestamp_column: Optional[str] = "", table: Optional[str] = None, schema: Optional[str] = None, @@ -27,6 +32,7 @@ def __init__( Creates a RedshiftSource object. Args: + name (optional): Name for the source. Defaults to the table_ref if not specified. event_timestamp_column (optional): Event timestamp column used for point in time joins of feature values. table (optional): Redshift table where the features are stored. @@ -38,7 +44,18 @@ def __init__( date_partition_column (optional): Timestamp column used for partitioning. query (optional): The query to be executed to obtain the features. """ + if table is None and query is None: + raise ValueError('No "table" argument provided.') + _name = name + if not _name: + if table: + _name = table + else: + raise DataSourceNoNameException() + + # TODO(adchia): figure out what to do if user uses the query to start super().__init__( + _name, event_timestamp_column, created_timestamp_column, field_mapping, @@ -80,7 +97,8 @@ def __eq__(self, other): ) return ( - self.redshift_options.table == other.redshift_options.table + self.name == other.name + and self.redshift_options.table == other.redshift_options.table and self.redshift_options.schema == other.redshift_options.schema and self.redshift_options.query == other.redshift_options.query and self.event_timestamp_column == other.event_timestamp_column diff --git a/sdk/python/feast/infra/offline_stores/snowflake_source.py b/sdk/python/feast/infra/offline_stores/snowflake_source.py index 4e1553f486..ff36f1a66a 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake_source.py +++ b/sdk/python/feast/infra/offline_stores/snowflake_source.py @@ -2,6 +2,7 @@ from feast import type_map from feast.data_source import DataSource +from feast.errors import DataSourceNoNameException from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto from feast.protos.feast.core.SavedDataset_pb2 import ( SavedDatasetStorage as SavedDatasetStorageProto, @@ -14,6 +15,7 @@ class SnowflakeSource(DataSource): def __init__( self, + name: Optional[str] = None, database: Optional[str] = None, schema: Optional[str] = None, table: Optional[str] = None, @@ -27,6 +29,7 @@ def __init__( Creates a SnowflakeSource object. Args: + name (optional): Name for the source. Defaults to the table if not specified. database (optional): Snowflake database where the features are stored. schema (optional): Snowflake schema in which the table is located. table (optional): Snowflake table where the features are stored. @@ -40,7 +43,19 @@ def __init__( date_partition_column (optional): Timestamp column used for partitioning. """ + if table is None and query is None: + raise ValueError('No "table" argument provided.') + + # If no name, use the table as the default name + _name = name + if not _name: + if table: + _name = table + else: + raise DataSourceNoNameException() + super().__init__( + _name, event_timestamp_column, created_timestamp_column, field_mapping, @@ -83,7 +98,8 @@ def __eq__(self, other): ) return ( - self.snowflake_options.database == other.snowflake_options.database + self.name == other.name + and self.snowflake_options.database == other.snowflake_options.database and self.snowflake_options.schema == other.snowflake_options.schema and self.snowflake_options.table == other.snowflake_options.table and self.snowflake_options.query == other.snowflake_options.query diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 1c47274067..cb1261d8c9 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -21,14 +21,18 @@ from typing import Any, Dict, List, Optional, Set from urllib.parse import urlparse +import dill from google.protobuf.internal.containers import RepeatedCompositeFieldContainer from google.protobuf.json_format import MessageToJson from proto import Message from feast.base_feature_view import BaseFeatureView +from feast.data_source import DataSource from feast.entity import Entity from feast.errors import ( ConflictingFeatureViewNames, + DataSourceNotFoundException, + DataSourceObjectNotFoundException, EntityNotFoundException, FeatureServiceNotFoundException, FeatureViewNotFoundException, @@ -65,6 +69,7 @@ class FeastObjectType(Enum): + DATA_SOURCE = "data source" ENTITY = "entity" FEATURE_VIEW = "feature view" ON_DEMAND_FEATURE_VIEW = "on demand feature view" @@ -76,6 +81,7 @@ def get_objects_from_registry( registry: "Registry", project: str ) -> Dict["FeastObjectType", List[Any]]: return { + FeastObjectType.DATA_SOURCE: registry.list_data_sources(project=project), FeastObjectType.ENTITY: registry.list_entities(project=project), FeastObjectType.FEATURE_VIEW: registry.list_feature_views(project=project), FeastObjectType.ON_DEMAND_FEATURE_VIEW: registry.list_on_demand_feature_views( @@ -94,6 +100,7 @@ def get_objects_from_repo_contents( repo_contents: RepoContents, ) -> Dict["FeastObjectType", Set[Any]]: return { + FeastObjectType.DATA_SOURCE: repo_contents.data_sources, FeastObjectType.ENTITY: repo_contents.entities, FeastObjectType.FEATURE_VIEW: repo_contents.feature_views, FeastObjectType.ON_DEMAND_FEATURE_VIEW: repo_contents.on_demand_feature_views, @@ -275,6 +282,70 @@ def list_entities(self, project: str, allow_cache: bool = False) -> List[Entity] entities.append(Entity.from_proto(entity_proto)) return entities + def list_data_sources( + self, project: str, allow_cache: bool = False + ) -> List[DataSource]: + """ + Retrieve a list of data sources from the registry + + Args: + project: Filter data source based on project name + allow_cache: Whether to allow returning data sources from a cached registry + + Returns: + List of data sources + """ + registry_proto = self._get_registry_proto(allow_cache=allow_cache) + data_sources = [] + for data_source_proto in registry_proto.data_sources: + if data_source_proto.project == project: + data_sources.append(DataSource.from_proto(data_source_proto)) + return data_sources + + def apply_data_source( + self, data_source: DataSource, project: str, commit: bool = True + ): + """ + Registers a single data source with Feast + + Args: + data_source: A data source that will be registered + project: Feast project that this data source belongs to + commit: Whether to immediately commit to the registry + """ + registry = self._prepare_registry_for_changes() + + for idx, existing_data_source_proto in enumerate(registry.data_sources): + if existing_data_source_proto.name == data_source.name: + del registry.data_sources[idx] + data_source_proto = data_source.to_proto() + data_source_proto.project = project + registry.data_sources.append(data_source_proto) + if commit: + self.commit() + + def delete_data_source(self, name: str, project: str, commit: bool = True): + """ + Deletes a data source or raises an exception if not found. + + Args: + name: Name of data source + project: Feast project that this data source belongs to + commit: Whether the change should be persisted immediately + """ + self._prepare_registry_for_changes() + assert self.cached_registry_proto + + for idx, data_source_proto in enumerate( + self.cached_registry_proto.data_sources + ): + if data_source_proto.name == name: + del self.cached_registry_proto.data_sources[idx] + if commit: + self.commit() + return + raise DataSourceNotFoundException(name) + def apply_feature_service( self, feature_service: FeatureService, project: str, commit: bool = True ): @@ -464,7 +535,8 @@ def get_on_demand_feature_view( Args: name: Name of on demand feature view - project: Feast project that this on demand feature belongs to + project: Feast project that this on demand feature view belongs to + allow_cache: Whether to allow returning this on demand feature view from a cached registry Returns: Returns either the specified on demand feature view, or raises an exception if @@ -480,6 +552,27 @@ def get_on_demand_feature_view( return OnDemandFeatureView.from_proto(on_demand_feature_view) raise OnDemandFeatureViewNotFoundException(name, project=project) + def get_data_source( + self, name: str, project: str, allow_cache: bool = False + ) -> DataSource: + """ + Retrieves a data source. + + Args: + name: Name of data source + project: Feast project that this data source belongs to + allow_cache: Whether to allow returning this data source from a cached registry + + Returns: + Returns either the specified data source, or raises an exception if none is found + """ + registry = self._get_registry_proto(allow_cache=allow_cache) + + for data_source in registry.data_sources: + if data_source.project == project and data_source.name == name: + return DataSource.from_proto(data_source) + raise DataSourceObjectNotFoundException(name, project=project) + def apply_materialization( self, feature_view: FeatureView, @@ -693,7 +786,7 @@ def delete_entity(self, name: str, project: str, commit: bool = True): raise EntityNotFoundException(name, project) def apply_saved_dataset( - self, saved_dataset: SavedDataset, project: str, commit: bool = True + self, saved_dataset: SavedDataset, project: str, commit: bool = True, ): """ Registers a single entity with Feast @@ -793,8 +886,14 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]: Args: project: Feast project to convert to a dict """ - registry_dict = defaultdict(list) - + registry_dict: Dict[str, Any] = defaultdict(list) + registry_dict["project"] = project + for data_source in sorted( + self.list_data_sources(project=project), key=lambda ds: ds.name + ): + registry_dict["dataSources"].append( + self._message_to_sorted_dict(data_source.to_proto()) + ) for entity in sorted( self.list_entities(project=project), key=lambda entity: entity.name ): @@ -819,9 +918,11 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]: self.list_on_demand_feature_views(project=project), key=lambda on_demand_feature_view: on_demand_feature_view.name, ): - registry_dict["onDemandFeatureViews"].append( - self._message_to_sorted_dict(on_demand_feature_view.to_proto()) + odfv_dict = self._message_to_sorted_dict(on_demand_feature_view.to_proto()) + odfv_dict["spec"]["userDefinedFunction"]["body"] = dill.source.getsource( + on_demand_feature_view.udf ) + registry_dict["onDemandFeatureViews"].append(odfv_dict) for request_feature_view in sorted( self.list_request_feature_views(project=project), key=lambda request_feature_view: request_feature_view.name, diff --git a/sdk/python/feast/repo_contents.py b/sdk/python/feast/repo_contents.py index 9190af11ee..b59adc34db 100644 --- a/sdk/python/feast/repo_contents.py +++ b/sdk/python/feast/repo_contents.py @@ -13,6 +13,7 @@ # limitations under the License. from typing import NamedTuple, Set +from feast.data_source import DataSource from feast.entity import Entity from feast.feature_service import FeatureService from feast.feature_view import FeatureView @@ -26,6 +27,7 @@ class RepoContents(NamedTuple): Represents the objects in a Feast feature repo. """ + data_sources: Set[DataSource] feature_views: Set[FeatureView] on_demand_feature_views: Set[OnDemandFeatureView] request_feature_views: Set[RequestFeatureView] @@ -34,6 +36,7 @@ class RepoContents(NamedTuple): def to_registry_proto(self) -> RegistryProto: registry_proto = RegistryProto() + registry_proto.data_sources.extend([e.to_proto() for e in self.data_sources]) registry_proto.entities.extend([e.to_proto() for e in self.entities]) registry_proto.feature_views.extend( [fv.to_proto() for fv in self.feature_views] diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index cba04a9942..4bee79bd60 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -12,6 +12,7 @@ import click from click.exceptions import BadParameter +from feast.data_source import DataSource from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add from feast.entity import Entity from feast.feature_service import FeatureService @@ -94,6 +95,7 @@ def get_repo_files(repo_root: Path) -> List[Path]: def parse_repo(repo_root: Path) -> RepoContents: """ Collect feature table definitions from feature repo """ res = RepoContents( + data_sources=set(), entities=set(), feature_views=set(), feature_services=set(), @@ -106,6 +108,8 @@ def parse_repo(repo_root: Path) -> RepoContents: module = importlib.import_module(module_path) for attr_name in dir(module): obj = getattr(module, attr_name) + if isinstance(obj, DataSource): + res.data_sources.add(obj) if isinstance(obj, FeatureView): res.feature_views.add(obj) elif isinstance(obj, Entity): @@ -258,18 +262,11 @@ def teardown(repo_config: RepoConfig, repo_path: Path): @log_exceptions_and_usage def registry_dump(repo_config: RepoConfig, repo_path: Path): """ For debugging only: output contents of the metadata registry """ - from colorama import Fore, Style - registry_config = repo_config.get_registry_config() project = repo_config.project registry = Registry(registry_config=registry_config, repo_path=repo_path) registry_dict = registry.to_dict(project=project) - warning = ( - "Warning: The registry-dump command is for debugging only and may contain " - "breaking changes in the future. No guarantees are made on this interface." - ) - click.echo(f"{Style.BRIGHT}{Fore.YELLOW}{warning}{Style.RESET_ALL}") click.echo(json.dumps(registry_dict, indent=2, sort_keys=True)) diff --git a/sdk/python/feast/saved_dataset.py b/sdk/python/feast/saved_dataset.py index 75b6d2c199..7a05a9ca22 100644 --- a/sdk/python/feast/saved_dataset.py +++ b/sdk/python/feast/saved_dataset.py @@ -54,6 +54,7 @@ class SavedDataset: full_feature_names: bool storage: SavedDatasetStorage tags: Dict[str, str] + feature_service_name: Optional[str] = None created_timestamp: Optional[datetime] = None last_updated_timestamp: Optional[datetime] = None @@ -71,6 +72,7 @@ def __init__( storage: SavedDatasetStorage, full_feature_names: bool = False, tags: Optional[Dict[str, str]] = None, + feature_service_name: Optional[str] = None, ): self.name = name self.features = features @@ -78,6 +80,7 @@ def __init__( self.storage = storage self.full_feature_names = full_feature_names self.tags = tags or {} + self.feature_service_name = feature_service_name self._retrieval_job = None @@ -121,6 +124,9 @@ def from_proto(saved_dataset_proto: SavedDatasetProto): tags=dict(saved_dataset_proto.spec.tags.items()), ) + if saved_dataset_proto.spec.feature_service_name: + ds.feature_service_name = saved_dataset_proto.spec.feature_service_name + if saved_dataset_proto.meta.HasField("created_timestamp"): ds.created_timestamp = ( saved_dataset_proto.meta.created_timestamp.ToDatetime() @@ -163,6 +169,8 @@ def to_proto(self) -> SavedDatasetProto: storage=self.storage.to_proto(), tags=self.tags, ) + if self.feature_service_name: + spec.feature_service_name = self.feature_service_name feature_service_proto = SavedDatasetProto(spec=spec, meta=meta) return feature_service_proto diff --git a/sdk/python/tests/example_repos/example_feature_repo_1.py b/sdk/python/tests/example_repos/example_feature_repo_1.py index e0e6b92380..8179906fa4 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_1.py +++ b/sdk/python/tests/example_repos/example_feature_repo_1.py @@ -10,13 +10,15 @@ ) driver_locations_source = BigQuerySource( - table_ref="feast-oss.public.drivers", + table="feast-oss.public.drivers", event_timestamp_column="event_timestamp", created_timestamp_column="created_timestamp", ) customer_profile_source = BigQuerySource( - table_ref="feast-oss.public.customers", event_timestamp_column="event_timestamp", + name="customer_profile_source", + table_ref="feast-oss.public.customers", + event_timestamp_column="event_timestamp", ) customer_driver_combined_source = BigQuerySource( diff --git a/sdk/python/tests/example_repos/example_repo_duplicate_data_source_names.py b/sdk/python/tests/example_repos/example_repo_duplicate_data_source_names.py new file mode 100644 index 0000000000..5ec08b7182 --- /dev/null +++ b/sdk/python/tests/example_repos/example_repo_duplicate_data_source_names.py @@ -0,0 +1,9 @@ +from feast import FileSource + +driver_hourly_stats = FileSource( + path="driver_stats.parquet", # this parquet is not real and will not be read +) + +driver_hourly_stats_clone = FileSource( + path="driver_stats.parquet", # this parquet is not real and will not be read +) diff --git a/sdk/python/tests/integration/registration/test_cli.py b/sdk/python/tests/integration/registration/test_cli.py index 25f0ae4841..c79d672fb9 100644 --- a/sdk/python/tests/integration/registration/test_cli.py +++ b/sdk/python/tests/integration/registration/test_cli.py @@ -69,6 +69,8 @@ def test_universal_cli(environment: Environment): assertpy.assert_that(result.returncode).is_equal_to(0) result = runner.run(["feature-services", "list"], cwd=repo_path) assertpy.assert_that(result.returncode).is_equal_to(0) + result = runner.run(["data-sources", "list"], cwd=repo_path) + assertpy.assert_that(result.returncode).is_equal_to(0) # entity & feature view describe commands should succeed when objects exist result = runner.run(["entities", "describe", "driver"], cwd=repo_path) @@ -83,6 +85,11 @@ def test_universal_cli(environment: Environment): ) assertpy.assert_that(result.returncode).is_equal_to(0) assertpy.assert_that(fs.list_feature_views()).is_length(3) + result = runner.run( + ["data-sources", "describe", "customer_profile_source"], cwd=repo_path, + ) + assertpy.assert_that(result.returncode).is_equal_to(0) + assertpy.assert_that(fs.list_data_sources()).is_length(3) # entity & feature view describe commands should fail when objects don't exist result = runner.run(["entities", "describe", "foo"], cwd=repo_path) @@ -91,6 +98,8 @@ def test_universal_cli(environment: Environment): assertpy.assert_that(result.returncode).is_equal_to(1) result = runner.run(["feature-services", "describe", "foo"], cwd=repo_path) assertpy.assert_that(result.returncode).is_equal_to(1) + result = runner.run(["data-sources", "describe", "foo"], cwd=repo_path) + assertpy.assert_that(result.returncode).is_equal_to(1) # Doing another apply should be a no op, and should not cause errors result = runner.run(["apply"], cwd=repo_path) diff --git a/sdk/python/tests/integration/registration/test_cli_apply_duplicated_featureview_names.py b/sdk/python/tests/integration/registration/test_cli_apply_duplicates.py similarity index 88% rename from sdk/python/tests/integration/registration/test_cli_apply_duplicated_featureview_names.py rename to sdk/python/tests/integration/registration/test_cli_apply_duplicates.py index 6987066e8d..bad3b50a80 100644 --- a/sdk/python/tests/integration/registration/test_cli_apply_duplicated_featureview_names.py +++ b/sdk/python/tests/integration/registration/test_cli_apply_duplicates.py @@ -6,10 +6,20 @@ def test_cli_apply_duplicated_featureview_names() -> None: - """ - Test apply feature views with duplicated names and single py file in a feature repo using CLI - """ + run_simple_apply_test( + example_repo_file_name="example_feature_repo_with_duplicated_featureview_names.py", + expected_error=b"Please ensure that all feature view names are case-insensitively unique", + ) + +def test_cli_apply_duplicate_data_source_names() -> None: + run_simple_apply_test( + example_repo_file_name="example_repo_duplicate_data_source_names.py", + expected_error=b"Please ensure that all data source names are case-insensitively unique", + ) + + +def run_simple_apply_test(example_repo_file_name: str, expected_error: bytes): with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name: runner = CliRunner() # Construct an example repo in a temporary dir @@ -31,18 +41,10 @@ def test_cli_apply_duplicated_featureview_names() -> None: ) repo_example = repo_path / "example.py" - repo_example.write_text( - get_example_repo( - "example_feature_repo_with_duplicated_featureview_names.py" - ) - ) + repo_example.write_text(get_example_repo(example_repo_file_name)) rc, output = runner.run_with_output(["apply"], cwd=repo_path) - assert ( - rc != 0 - and b"Please ensure that all feature view names are case-insensitively unique" - in output - ) + assert rc != 0 and expected_error in output def test_cli_apply_imported_featureview() -> None: diff --git a/sdk/python/tests/integration/registration/test_inference.py b/sdk/python/tests/integration/registration/test_inference.py index ca5f56c435..c334013b51 100644 --- a/sdk/python/tests/integration/registration/test_inference.py +++ b/sdk/python/tests/integration/registration/test_inference.py @@ -1,9 +1,22 @@ import pandas as pd import pytest -from feast import Entity, Feature, RepoConfig, ValueType +from feast import ( + BigQuerySource, + Entity, + Feature, + FileSource, + RedshiftSource, + RepoConfig, + SnowflakeSource, + ValueType, +) from feast.data_source import RequestDataSource -from feast.errors import RegistryInferenceFailure, SpecifiedFeaturesNotPresentError +from feast.errors import ( + DataSourceNoNameException, + RegistryInferenceFailure, + SpecifiedFeaturesNotPresentError, +) from feast.feature_view import FeatureView from feast.inference import ( update_data_sources_with_inferred_event_timestamp_col, @@ -58,6 +71,38 @@ def test_update_entities_with_inferred_types_from_feature_views( ) +def test_infer_datasource_names_file(): + file_path = "path/to/test.csv" + data_source = FileSource(path=file_path) + assert data_source.name == file_path + + source_name = "my_name" + data_source = FileSource(name=source_name, path=file_path) + assert data_source.name == source_name + + +def test_infer_datasource_names_dwh(): + table = "project.table" + dwh_classes = [BigQuerySource, RedshiftSource, SnowflakeSource] + + for dwh_class in dwh_classes: + data_source = dwh_class(table=table) + assert data_source.name == table + + source_name = "my_name" + data_source_with_table = dwh_class(name=source_name, table=table) + assert data_source_with_table.name == source_name + data_source_with_query = dwh_class( + name=source_name, query=f"SELECT * from {table}" + ) + assert data_source_with_query.name == source_name + + # If we have a query and no name, throw an error + with pytest.raises(DataSourceNoNameException): + print(f"Testing dwh {dwh_class}") + data_source = dwh_class(query="test_query") + + @pytest.mark.integration def test_update_data_sources_with_inferred_event_timestamp_col(simple_dataset_1): df_with_two_viable_timestamp_cols = simple_dataset_1.copy(deep=True) @@ -137,3 +182,56 @@ def test_view_with_missing_feature(features_df: pd.DataFrame) -> pd.DataFrame: with pytest.raises(SpecifiedFeaturesNotPresentError): test_view_with_missing_feature.infer_features() + + +def test_datasource_inference(): + # Create Feature Views + date_request = RequestDataSource( + name="date_request", schema={"some_date": ValueType.UNIX_TIMESTAMP} + ) + + @on_demand_feature_view( + inputs={"date_request": date_request}, + features=[ + Feature("output", ValueType.UNIX_TIMESTAMP), + Feature("string_output", ValueType.STRING), + ], + ) + def test_view(features_df: pd.DataFrame) -> pd.DataFrame: + data = pd.DataFrame() + data["output"] = features_df["some_date"] + data["string_output"] = features_df["some_date"].astype(pd.StringDtype()) + return data + + test_view.infer_features() + + @on_demand_feature_view( + inputs={"date_request": date_request}, + features=[ + Feature("output", ValueType.UNIX_TIMESTAMP), + Feature("object_output", ValueType.STRING), + ], + ) + def invalid_test_view(features_df: pd.DataFrame) -> pd.DataFrame: + data = pd.DataFrame() + data["output"] = features_df["some_date"] + data["object_output"] = features_df["some_date"].astype(str) + return data + + with pytest.raises(ValueError, match="Value with native type object"): + invalid_test_view.infer_features() + + @on_demand_feature_view( + inputs={"date_request": date_request}, + features=[ + Feature("output", ValueType.UNIX_TIMESTAMP), + Feature("missing", ValueType.STRING), + ], + ) + def test_view_with_missing_feature(features_df: pd.DataFrame) -> pd.DataFrame: + data = pd.DataFrame() + data["output"] = features_df["some_date"] + return data + + with pytest.raises(SpecifiedFeaturesNotPresentError): + test_view_with_missing_feature.infer_features() diff --git a/sdk/python/tests/integration/registration/test_registry.py b/sdk/python/tests/integration/registration/test_registry.py index 0394a52cfb..535497634d 100644 --- a/sdk/python/tests/integration/registration/test_registry.py +++ b/sdk/python/tests/integration/registration/test_registry.py @@ -32,14 +32,14 @@ @pytest.fixture -def local_registry(): +def local_registry() -> Registry: fd, registry_path = mkstemp() registry_config = RegistryConfig(path=registry_path, cache_ttl_seconds=600) return Registry(registry_config, None) @pytest.fixture -def gcs_registry(): +def gcs_registry() -> Registry: from google.cloud import storage storage_client = storage.Client() @@ -58,7 +58,7 @@ def gcs_registry(): @pytest.fixture -def s3_registry(): +def s3_registry() -> Registry: registry_config = RegistryConfig( path=f"s3://feast-integration-tests/registries/{int(time.time() * 1000)}/registry.db", cache_ttl_seconds=600, @@ -428,6 +428,70 @@ def test_apply_feature_view_integration(test_registry): test_registry._get_registry_proto() +@pytest.mark.integration +@pytest.mark.parametrize( + "test_registry", [lazy_fixture("gcs_registry"), lazy_fixture("s3_registry")], +) +def test_apply_data_source(test_registry: Registry): + # Create Feature Views + batch_source = FileSource( + name="test_source", + file_format=ParquetFormat(), + path="file://feast/*", + event_timestamp_column="ts_col", + created_timestamp_column="timestamp", + date_partition_column="date_partition_col", + ) + + fv1 = FeatureView( + name="my_feature_view_1", + features=[ + Feature(name="fs1_my_feature_1", dtype=ValueType.INT64), + Feature(name="fs1_my_feature_2", dtype=ValueType.STRING), + Feature(name="fs1_my_feature_3", dtype=ValueType.STRING_LIST), + Feature(name="fs1_my_feature_4", dtype=ValueType.BYTES_LIST), + ], + entities=["fs1_my_entity_1"], + tags={"team": "matchmaking"}, + batch_source=batch_source, + ttl=timedelta(minutes=5), + ) + + project = "project" + + # Register data source and feature view + test_registry.apply_data_source(batch_source, project, commit=False) + test_registry.apply_feature_view(fv1, project, commit=True) + + registry_feature_views = test_registry.list_feature_views(project) + registry_data_sources = test_registry.list_data_sources(project) + assert len(registry_feature_views) == 1 + assert len(registry_data_sources) == 1 + registry_feature_view = registry_feature_views[0] + assert registry_feature_view.batch_source == batch_source + registry_data_source = registry_data_sources[0] + assert registry_data_source == batch_source + + # Check that change to batch source propagates + batch_source.event_timestamp_column = "new_ts_col" + test_registry.apply_data_source(batch_source, project, commit=False) + test_registry.apply_feature_view(fv1, project, commit=True) + registry_feature_views = test_registry.list_feature_views(project) + registry_data_sources = test_registry.list_data_sources(project) + assert len(registry_feature_views) == 1 + assert len(registry_data_sources) == 1 + registry_feature_view = registry_feature_views[0] + assert registry_feature_view.batch_source == batch_source + registry_batch_source = test_registry.list_data_sources(project)[0] + assert registry_batch_source == batch_source + + test_registry.teardown() + + # Will try to reload registry, which will fail because the file has been deleted + with pytest.raises(FileNotFoundError): + test_registry._get_registry_proto() + + def test_commit(): fd, registry_path = mkstemp() registry_config = RegistryConfig(path=registry_path, cache_ttl_seconds=600) diff --git a/sdk/python/tests/utils/data_source_utils.py b/sdk/python/tests/utils/data_source_utils.py index 12870186bf..5a5baceef0 100644 --- a/sdk/python/tests/utils/data_source_utils.py +++ b/sdk/python/tests/utils/data_source_utils.py @@ -52,6 +52,7 @@ def simple_bq_source_using_query_arg(df, event_timestamp_column=None) -> BigQuer df, event_timestamp_column ) return BigQuerySource( + name=bq_source_using_table_ref.table_ref, query=f"SELECT * FROM {bq_source_using_table_ref.table_ref}", event_timestamp_column=event_timestamp_column, )