Skip to content

Commit

Permalink
fix: Updated AWS Athena template (#3322)
Browse files Browse the repository at this point in the history
* Update the template on how to use AWS Athena

Signed-off-by: Youngkyu OH <toping4445@gmail.com>

* Remove unnecessary imports

Signed-off-by: Youngkyu OH <toping4445@gmail.com>

* lint & format

Signed-off-by: Youngkyu OH <toping4445@gmail.com>

Signed-off-by: Youngkyu OH <toping4445@gmail.com>
  • Loading branch information
yelo-blood authored Nov 6, 2022
1 parent 80712a7 commit 5956981
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ online_store:
path: online_store.db
offline_store:
type: athena
region: ap-northeast-2
database: sampledb
region: {AWS region}
database: {The database in the data catalog to be used by Athena}
data_source: AwsDataCatalog
s3_staging_location: s3://sagemaker-yelo-test
s3_staging_location: s3://{S3 bucket to be used by Feast}
workgroup: {Workgroup for Athena}
entity_key_serialization_version: 2
35 changes: 22 additions & 13 deletions sdk/python/feast/templates/athena/feature_repo/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,28 @@

import pandas as pd

from feast import Entity, Feature, FeatureStore, FeatureView, ValueType
from feast import Entity, FeatureStore, FeatureView, Field
from feast.infra.offline_stores.contrib.athena_offline_store.athena_source import (
AthenaSource,
)
from feast.types import Float64, Int64


def test_end_to_end():

try:
fs = FeatureStore(".")

# Before running this test method
# 1. Upload the driver_stats.parquet file to your S3 bucket.
# (https://github.com/feast-dev/feast-custom-offline-store-demo/tree/main/feature_repo/data)
# 2. Using AWS Glue Crawler, create a table in the data catalog. The generated table can be queried through Athena.
# 3. Specify the S3 bucket name, data source(AwsDataCatalog), database name, Athena's workgroup, etc. in feature_store.yaml

fs = FeatureStore("./feature_repo")

# Partition pruning has a significant impact on Athena's query performance and cost.
# If offline feature dataset is large, it is highly recommended to create partitions using date columns such as ('created','event_timestamp')
# The date_partition_column must be in form of YYYY-MM-DD(string) as in the beginning of the date column.

driver_hourly_stats = AthenaSource(
timestamp_field="event_timestamp",
Expand All @@ -21,31 +33,29 @@ def test_end_to_end():
database="sampledb",
data_source="AwsDataCatalog",
created_timestamp_column="created",
# date_partition_column="std_date"
# date_partition_column="std_date" #YYYY-MM-DD
)

driver = Entity(
name="driver_id",
value_type=ValueType.INT64,
description="driver id",
)

driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
ttl=timedelta(days=365),
features=[
Feature(name="conv_rate", dtype=ValueType.FLOAT),
Feature(name="acc_rate", dtype=ValueType.FLOAT),
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
entities=[driver],
ttl=timedelta(days=500),
schema=[
Field(name="conv_rate", dtype=Float64),
Field(name="acc_rate", dtype=Float64),
Field(name="avg_daily_trips", dtype=Int64),
],
online=True,
batch_source=driver_hourly_stats,
source=driver_hourly_stats,
)

# apply repository
fs.apply([driver_hourly_stats, driver, driver_hourly_stats_view])

print(fs.list_data_sources())
print(fs.list_feature_views())

Expand All @@ -54,7 +64,6 @@ def test_end_to_end():
)

# Read features from offline store

feature_vector = (
fs.get_historical_features(
features=["driver_hourly_stats:conv_rate"], entity_df=entity_df
Expand Down

0 comments on commit 5956981

Please sign in to comment.