Skip to content

Commit

Permalink
feat: Feast Spark Offline Store (feast-dev#2349)
Browse files Browse the repository at this point in the history
* State of feast

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Clean up changes

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix random incorrect changes

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix lint

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix build errors

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix lint

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Add spark offline store components to test against current integration tests

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix lint

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Rename to pass checks

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix issues

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix type checking issues

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix lint

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Clean up print statements for first review

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix lint

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix flake 8 lint tests

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Add warnings for alpha version release

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Format

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Address review

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Address review

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix lint

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Add file store functionality

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* lint

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Add example feature repo

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Update data source creator

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Make cli work for feast init with spark

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Update the docs

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Clean up code

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Clean up more code

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Uncomment repo configs

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix setup.py

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Update dependencies

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix ci dependencies

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Screwed up rebase

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Screwed up rebase

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Screwed up rebase

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Realign with master

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix accidental changes

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Make type map change cleaner

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Address review comments

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix tests accidentally broken

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Add comments

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Reformat

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix logger

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Remove unused imports

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix imports

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix CI dependencies

Signed-off-by: Danny Chiao <danny@tecton.ai>

* Prefix destinations with project name

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Update comment

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix 3.8

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* temporary fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* rollback

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* update

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Update ci?

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Move third party to contrib

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix imports

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Remove third_party refactor

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Revert ci requirements and update comment in type map

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Revert 3.8-requirements

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

Co-authored-by: Danny Chiao <danny@tecton.ai>
Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
2 people authored and achals committed Mar 8, 2022
1 parent 7e85d6c commit 12aafc2
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 26 deletions.
12 changes: 3 additions & 9 deletions docs/reference/data-sources/spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ The spark data source API allows for the retrieval of historical feature values
Using a table reference from SparkSession(for example, either in memory or a Hive Metastore)

```python
from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
SparkSource,
)
from feast import SparkSource

my_spark_source = SparkSource(
table="FEATURE_TABLE",
Expand All @@ -25,9 +23,7 @@ my_spark_source = SparkSource(
Using a query

```python
from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
SparkSource,
)
from feast import SparkSource

my_spark_source = SparkSource(
query="SELECT timestamp as ts, created, f1, f2 "
Expand All @@ -38,9 +34,7 @@ my_spark_source = SparkSource(
Using a file reference

```python
from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
SparkSource,
)
from feast import SparkSource

my_spark_source = SparkSource(
path=f"{CURRENT_DIR}/data/driver_hourly_stats",
Expand Down
4 changes: 4 additions & 0 deletions sdk/python/feast/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from pkg_resources import DistributionNotFound, get_distribution

from feast.infra.offline_stores.bigquery_source import BigQuerySource
from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
SparkSource,
)
from feast.infra.offline_stores.file_source import FileSource
from feast.infra.offline_stores.redshift_source import RedshiftSource
from feast.infra.offline_stores.snowflake_source import SnowflakeSource
Expand Down Expand Up @@ -47,4 +50,5 @@
"RedshiftSource",
"RequestFeatureView",
"SnowflakeSource",
"SparkSource",
]
8 changes: 3 additions & 5 deletions sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
FileSource,
RedshiftSource,
SnowflakeSource,
SparkSource,
)
from feast.data_source import DataSource, RequestDataSource
from feast.errors import RegistryInferenceFailure
Expand Down Expand Up @@ -86,10 +87,8 @@ def update_data_sources_with_inferred_event_timestamp_col(
):
# prepare right match pattern for data source
ts_column_type_regex_pattern = ""
# TODO(adchia): Move Spark source inference out of this logic
if (
isinstance(data_source, FileSource)
or "SparkSource" == data_source.__class__.__name__
if isinstance(data_source, FileSource) or isinstance(
data_source, SparkSource
):
ts_column_type_regex_pattern = r"^timestamp"
elif isinstance(data_source, BigQuerySource):
Expand All @@ -104,7 +103,6 @@ def update_data_sources_with_inferred_event_timestamp_col(
f"""
DataSource inferencing of event_timestamp_column is currently only supported
for FileSource, SparkSource, BigQuerySource, RedshiftSource, and SnowflakeSource.
Attempting to infer from {data_source}.
""",
)
# for informing the type checker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from pyspark.sql import SparkSession

from feast.data_source import DataSource
from feast.errors import DataSourceNoNameException
from feast.infra.offline_stores.offline_utils import get_temp_entity_table_name
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.protos.feast.core.SavedDataset_pb2 import (
Expand All @@ -31,7 +30,6 @@ class SparkSourceFormat(Enum):
class SparkSource(DataSource):
def __init__(
self,
name: Optional[str] = None,
table: Optional[str] = None,
query: Optional[str] = None,
path: Optional[str] = None,
Expand All @@ -41,15 +39,7 @@ def __init__(
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = None,
):
# If no name, use the table_ref as the default name
_name = name
if not _name:
if table:
_name = table
else:
raise DataSourceNoNameException()
super().__init__(
_name,
event_timestamp_column,
created_timestamp_column,
field_mapping,
Expand Down Expand Up @@ -116,7 +106,6 @@ def from_proto(data_source: DataSourceProto) -> Any:

spark_options = SparkOptions.from_proto(data_source.custom_options)
return SparkSource(
name=data_source.name,
field_mapping=dict(data_source.field_mapping),
table=spark_options.table,
query=spark_options.query,
Expand All @@ -129,7 +118,6 @@ def from_proto(data_source: DataSourceProto) -> Any:

def to_proto(self) -> DataSourceProto:
data_source_proto = DataSourceProto(
name=self.name,
type=DataSourceProto.CUSTOM_SOURCE,
field_mapping=self.field_mapping,
custom_options=self.spark_options.to_proto(),
Expand Down

0 comments on commit 12aafc2

Please sign in to comment.