Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Metadata changes & making data sources top level objects to power Feast UI #2336

Merged
merged 12 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import "feast/core/DataFormat.proto";
import "feast/types/Value.proto";

// Defines a Data Source that can be used source Feature data
// Next available id: 22
message DataSource {
// Field indexes should *not* be reused. Not sure if fields 6-10 were used previously or not,
// but they are going to be reserved for backwards compatibility.
Expand All @@ -45,6 +46,13 @@ message DataSource {
REQUEST_SOURCE = 7;

}

// Unique name of data source within the project
string name = 20;

// Name of Feast project that this data source belongs to.
string project = 21;

SourceType type = 1;

// Defines mapping between fields in the sourced data
Expand Down Expand Up @@ -156,9 +164,7 @@ message DataSource {

// Defines options for DataSource that sources features from request data
message RequestDataOptions {
// Name of the request data source
string name = 1;

reserved 1;
// Mapping of feature name to type
map<string, feast.types.ValueType.Enum> schema = 2;
}
Expand Down
2 changes: 0 additions & 2 deletions protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ message OnDemandFeatureViewSpec {
map<string, OnDemandInput> inputs = 4;

UserDefinedFunction user_defined_function = 5;


}

message OnDemandFeatureViewMeta {
Expand Down
3 changes: 3 additions & 0 deletions protos/feast/core/Registry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ import "feast/core/FeatureView.proto";
import "feast/core/InfraObject.proto";
import "feast/core/OnDemandFeatureView.proto";
import "feast/core/RequestFeatureView.proto";
import "feast/core/DataSource.proto";
import "feast/core/SavedDataset.proto";
import "google/protobuf/timestamp.proto";

// Next id: 13
message Registry {
repeated Entity entities = 1;
repeated FeatureTable feature_tables = 2;
repeated FeatureView feature_views = 6;
repeated DataSource data_sources = 12;
repeated OnDemandFeatureView on_demand_feature_views = 8;
repeated RequestFeatureView request_feature_views = 9;
repeated FeatureService feature_services = 7;
Expand Down
2 changes: 0 additions & 2 deletions protos/feast/core/RequestFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";
option java_outer_classname = "RequestFeatureViewProto";
option java_package = "feast.proto.core";

import "feast/core/FeatureView.proto";
import "feast/core/Feature.proto";
import "feast/core/DataSource.proto";

message RequestFeatureView {
Expand Down
5 changes: 4 additions & 1 deletion protos/feast/core/SavedDataset.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ option java_outer_classname = "SavedDatasetProto";
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";

import "google/protobuf/timestamp.proto";
import "feast/core/FeatureViewProjection.proto";
import "feast/core/DataSource.proto";
import "feast/core/FeatureService.proto";

message SavedDatasetSpec {
// Name of the dataset. Must be unique since it's possible to overwrite dataset by name
Expand All @@ -44,6 +44,9 @@ message SavedDatasetSpec {

SavedDatasetStorage storage = 6;

// Optional and only populated if generated from a feature service fetch
string feature_service_name = 8;

// User defined metadata
map<string, string> tags = 7;
}
Expand Down
1 change: 0 additions & 1 deletion protos/feast/core/ValidationProfile.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ option java_package = "feast.proto.core";
option java_outer_classname = "ValidationProfile";
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";

import "google/protobuf/timestamp.proto";
import "feast/core/SavedDataset.proto";

message GEValidationProfiler {
Expand Down
1 change: 0 additions & 1 deletion protos/feast/storage/Redis.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

syntax = "proto3";

import "feast/types/Field.proto";
import "feast/types/Value.proto";

package feast.storage;
Expand Down
50 changes: 50 additions & 0 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,56 @@ def endpoint(ctx: click.Context):
_logger.info("There is no active feature server.")


@cli.group(name="data-sources")
def data_sources_cmd():
"""
Access data sources
"""
pass


@data_sources_cmd.command("describe")
@click.argument("name", type=click.STRING)
@click.pass_context
def data_source_describe(ctx: click.Context, name: str):
"""
Describe a data source
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))

try:
data_source = store.get_data_source(name)
except FeastObjectNotFoundException as e:
print(e)
exit(1)

print(
yaml.dump(
yaml.safe_load(str(data_source)), default_flow_style=False, sort_keys=False
)
)


@data_sources_cmd.command(name="list")
@click.pass_context
def data_source_list(ctx: click.Context):
"""
List all data sources
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))
table = []
for datasource in store.list_data_sources():
table.append([datasource.name, datasource.__class__])

from tabulate import tabulate

print(tabulate(table, headers=["NAME", "CLASS"], tablefmt="plain"))


@cli.group(name="entities")
def entities_cmd():
"""
Expand Down
57 changes: 42 additions & 15 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ class DataSource(ABC):
DataSource that can be used to source features.

Args:
name: Name of data source, which should be unique within a project
event_timestamp_column (optional): Event timestamp column used for point in time
joins of feature values.
created_timestamp_column (optional): Timestamp column indicating when the row
Expand All @@ -149,19 +150,22 @@ class DataSource(ABC):
date_partition_column (optional): Timestamp column used for partitioning.
"""

name: str
event_timestamp_column: str
created_timestamp_column: str
field_mapping: Dict[str, str]
date_partition_column: str

def __init__(
self,
name: str,
event_timestamp_column: Optional[str] = None,
created_timestamp_column: Optional[str] = None,
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = None,
):
"""Creates a DataSource object."""
self.name = name
self.event_timestamp_column = (
event_timestamp_column if event_timestamp_column else ""
)
Expand All @@ -173,12 +177,16 @@ def __init__(
date_partition_column if date_partition_column else ""
)

def __hash__(self):
return hash((id(self), self.name))

def __eq__(self, other):
if not isinstance(other, DataSource):
raise TypeError("Comparisons should only involve DataSource class objects.")

if (
self.event_timestamp_column != other.event_timestamp_column
self.name != other.name
or self.event_timestamp_column != other.event_timestamp_column
or self.created_timestamp_column != other.created_timestamp_column
or self.field_mapping != other.field_mapping
or self.date_partition_column != other.date_partition_column
Expand Down Expand Up @@ -206,7 +214,9 @@ def from_proto(data_source: DataSourceProto) -> Any:
cls = get_data_source_class_from_type(data_source.data_source_class_type)
return cls.from_proto(data_source)

if data_source.file_options.file_format and data_source.file_options.file_url:
if data_source.request_data_options and data_source.request_data_options.schema:
data_source_obj = RequestDataSource.from_proto(data_source)
elif data_source.file_options.file_format and data_source.file_options.file_url:
from feast.infra.offline_stores.file_source import FileSource

data_source_obj = FileSource.from_proto(data_source)
Expand Down Expand Up @@ -246,7 +256,7 @@ def from_proto(data_source: DataSourceProto) -> Any:
@abstractmethod
def to_proto(self) -> DataSourceProto:
"""
Converts an DataSourceProto object to its protobuf representation.
Converts a DataSourceProto object to its protobuf representation.
"""
raise NotImplementedError

Expand Down Expand Up @@ -296,6 +306,7 @@ def get_table_column_names_and_types(

def __init__(
self,
name: str,
event_timestamp_column: str,
bootstrap_servers: str,
message_format: StreamFormat,
Expand All @@ -305,6 +316,7 @@ def __init__(
date_partition_column: Optional[str] = "",
):
super().__init__(
name,
event_timestamp_column,
created_timestamp_column,
field_mapping,
Expand Down Expand Up @@ -335,6 +347,7 @@ def __eq__(self, other):
@staticmethod
def from_proto(data_source: DataSourceProto):
return KafkaSource(
name=data_source.name,
field_mapping=dict(data_source.field_mapping),
bootstrap_servers=data_source.kafka_options.bootstrap_servers,
message_format=StreamFormat.from_proto(
Expand All @@ -348,6 +361,7 @@ def from_proto(data_source: DataSourceProto):

def to_proto(self) -> DataSourceProto:
data_source_proto = DataSourceProto(
name=self.name,
type=DataSourceProto.STREAM_KAFKA,
field_mapping=self.field_mapping,
kafka_options=self.kafka_options.to_proto(),
Expand All @@ -363,6 +377,9 @@ def to_proto(self) -> DataSourceProto:
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
return type_map.redshift_to_feast_value_type

def get_table_query_string(self) -> str:
raise NotImplementedError


class RequestDataSource(DataSource):
"""
Expand All @@ -373,19 +390,14 @@ class RequestDataSource(DataSource):
schema: Schema mapping from the input feature name to a ValueType
"""

@staticmethod
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
raise NotImplementedError

name: str
schema: Dict[str, ValueType]

def __init__(
self, name: str, schema: Dict[str, ValueType],
):
"""Creates a RequestDataSource object."""
super().__init__()
self.name = name
super().__init__(name)
self.schema = schema

def validate(self, config: RepoConfig):
Expand All @@ -402,21 +414,28 @@ def from_proto(data_source: DataSourceProto):
schema = {}
for key in schema_pb.keys():
schema[key] = ValueType(schema_pb.get(key))
return RequestDataSource(
name=data_source.request_data_options.name, schema=schema
)
return RequestDataSource(name=data_source.name, schema=schema)

def to_proto(self) -> DataSourceProto:
schema_pb = {}
for key, value in self.schema.items():
schema_pb[key] = value.value
options = DataSourceProto.RequestDataOptions(name=self.name, schema=schema_pb)
options = DataSourceProto.RequestDataOptions(schema=schema_pb)
data_source_proto = DataSourceProto(
type=DataSourceProto.REQUEST_SOURCE, request_data_options=options
name=self.name,
type=DataSourceProto.REQUEST_SOURCE,
request_data_options=options,
)

return data_source_proto

def get_table_query_string(self) -> str:
raise NotImplementedError

@staticmethod
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
raise NotImplementedError


class KinesisSource(DataSource):
def validate(self, config: RepoConfig):
Expand All @@ -430,6 +449,7 @@ def get_table_column_names_and_types(
@staticmethod
def from_proto(data_source: DataSourceProto):
return KinesisSource(
name=data_source.name,
field_mapping=dict(data_source.field_mapping),
record_format=StreamFormat.from_proto(
data_source.kinesis_options.record_format
Expand All @@ -445,8 +465,12 @@ def from_proto(data_source: DataSourceProto):
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
pass

def get_table_query_string(self) -> str:
raise NotImplementedError

def __init__(
self,
name: str,
event_timestamp_column: str,
created_timestamp_column: str,
record_format: StreamFormat,
Expand All @@ -456,6 +480,7 @@ def __init__(
date_partition_column: Optional[str] = "",
):
super().__init__(
name,
event_timestamp_column,
created_timestamp_column,
field_mapping,
Expand All @@ -475,7 +500,8 @@ def __eq__(self, other):
)

if (
self.kinesis_options.record_format != other.kinesis_options.record_format
self.name != other.name
or self.kinesis_options.record_format != other.kinesis_options.record_format
or self.kinesis_options.region != other.kinesis_options.region
or self.kinesis_options.stream_name != other.kinesis_options.stream_name
):
Expand All @@ -485,6 +511,7 @@ def __eq__(self, other):

def to_proto(self) -> DataSourceProto:
data_source_proto = DataSourceProto(
name=self.name,
type=DataSourceProto.STREAM_KINESIS,
field_mapping=self.field_mapping,
kinesis_options=self.kinesis_options.to_proto(),
Expand Down
Loading