Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement spark materialization engine #3184

Merged
merged 19 commits into from
Sep 15, 2022

Conversation

niklasvm
Copy link
Collaborator

@niklasvm niklasvm commented Sep 5, 2022

What this PR does / why we need it:

Implement SparkMaterializationEngine which parallelizes writing to the online store across spark executors. This introduces a spark batch engine type.

How

  1. Data is queried from the spark offline store
  2. foreachPartition is called on the spark data frame. Each partition of data is processed on the worker nodes.

Usage

The SparkMaterializationEngine is intended to only work with the SparkOfflineStore and an online store that supports parallel writes (not sqlite).

e.g. feature_store.yaml:

project: my_project
registry: data/registry.db
provider: local
batch_engine:
    type: spark.engine
offline_store:
    type: spark.offline
    spark_conf:
        spark.master: "local[*]"
        spark.ui.enabled: "false"
        spark.eventLog.enabled: "false"
        spark.sql.catalogImplementation: "hive"
        spark.sql.parser.quotedRegexColumnNames: "true"
        spark.sql.session.timeZone: "UTC"
online_store:
    type: redis
        connection_string: "localhost:6379"
entity_key_serialization_version: 2

Some considerations

  • Should we distinguish between the spark offline store and engine the same way as snowflake i.e. spark.offline and spark.engine?
  • Can we introduce some larger test data sets that run part of integration tests?

Unit and integration tests are running successfully however this process should be tested on a larger set of data to ensure parallelization is working appropriately.

Which issue(s) this PR fixes:

Fixes #3167

Signed-off-by: niklasvm <niklasvm@gmail.com>
Signed-off-by: niklasvm <niklasvm@gmail.com>
@niklasvm niklasvm marked this pull request as ready for review September 5, 2022 09:32
@codecov-commenter
Copy link

codecov-commenter commented Sep 5, 2022

Codecov Report

Base: 67.02% // Head: 58.28% // Decreases project coverage by -8.73% ⚠️

Coverage data is based on head (f828d2b) compared to base (11a0e1a).
Patch coverage: 51.61% of modified lines in pull request are covered.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #3184      +/-   ##
==========================================
- Coverage   67.02%   58.28%   -8.74%     
==========================================
  Files         175      210      +35     
  Lines       15942    17689    +1747     
==========================================
- Hits        10685    10310     -375     
- Misses       5257     7379    +2122     
Flag Coverage Δ
integrationtests ?
unittests 58.28% <51.61%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdk/python/feast/repo_config.py 76.04% <ø> (-6.47%) ⬇️
...ration/materialization/contrib/spark/test_spark.py 50.00% <50.00%> (ø)
...ffline_stores/contrib/spark_offline_store/spark.py 33.18% <100.00%> (ø)
...sts/integration/registration/test_universal_cli.py 20.20% <0.00%> (-79.80%) ⬇️
...ts/integration/offline_store/test_offline_write.py 26.08% <0.00%> (-73.92%) ⬇️
...fline_store/test_universal_historical_retrieval.py 28.75% <0.00%> (-71.25%) ⬇️
...ests/integration/e2e/test_python_feature_server.py 29.50% <0.00%> (-70.50%) ⬇️
...dk/python/tests/integration/e2e/test_validation.py 27.55% <0.00%> (-69.30%) ⬇️
...s/integration/registration/test_universal_types.py 32.25% <0.00%> (-67.75%) ⬇️
sdk/python/feast/infra/online_stores/redis.py 28.39% <0.00%> (-66.67%) ⬇️
... and 162 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report at Codecov.
📢 Do you have feedback about the report comment? Let us know in this issue.

Signed-off-by: niklasvm <niklasvm@gmail.com>
Signed-off-by: niklasvm <niklasvm@gmail.com>
Signed-off-by: niklasvm <niklasvm@gmail.com>
Signed-off-by: niklasvm <niklasvm@gmail.com>
@kevjumba kevjumba self-assigned this Sep 7, 2022
Signed-off-by: niklasvm <niklasvm@gmail.com>
Signed-off-by: niklasvm <niklasvm@gmail.com>
Signed-off-by: niklasvm <niklasvm@gmail.com>
Signed-off-by: niklasvm <niklasvm@gmail.com>
Signed-off-by: niklasvm <niklasvm@gmail.com>
Comment on lines +175 to +183
spark_df = offline_job.to_spark_df()
if self.repo_config.batch_engine.partitions != 0:
spark_df = spark_df.repartition(
self.repo_config.batch_engine.partitions
)

spark_df.foreachPartition(
lambda x: _process_by_partition(x, spark_serialized_artifacts)
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

Signed-off-by: niklasvm <niklasvm@gmail.com>
Signed-off-by: niklasvm <niklasvm@gmail.com>
Signed-off-by: niklasvm <niklasvm@gmail.com>
Signed-off-by: niklasvm <niklasvm@gmail.com>
@niklasvm niklasvm requested a review from achals September 8, 2022 07:00
@ckarwicki
Copy link
Contributor

@niklasvm Can you fix integration tests? We are also waiting on this PR and would like to use it. Did you get a chance to test it on a cluster? Feast would have to be provided to worker nodes since it deserializes config. We can have example how to do it.

@niklasvm
Copy link
Collaborator Author

@niklasvm Can you fix integration tests? We are also waiting on this PR and would like to use it. Did you get a chance to test it on a cluster? Feast would have to be provided to worker nodes since it deserializes config. We can have example how to do it.

@ckarwicki I didn't realise the integration test failed. It looks like the issue is related to the redis docker container not accepting connections. I will rerun the pipeline.

I have not tested this on a cluster, only in spark local mode. What type of cluster are you using?

Signed-off-by: niklasvm <niklasvm@gmail.com>
Signed-off-by: niklasvm <niklasvm@gmail.com>
Signed-off-by: niklasvm <niklasvm@gmail.com>
# unserialize artifacts
feature_view, online_store, repo_config = spark_serialized_artifacts.unserialize()

if feature_view.batch_source.field_mapping is not None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since lines 249 to 257 are also used in feature_store.write_to_online_store, maybe it makes sense to refactor this into a util method?

)


class SparkMaterializationEngineConfig(FeastConfigBaseModel):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably makes sense to throw an error somewhere if the offline store is not the SparkOfflineStore?

@ckarwicki
Copy link
Contributor

ckarwicki commented Sep 13, 2022

@niklasvm We are running Bitnami Spark cluster on EKS.

@hao-affirm
Copy link
Contributor

I'm also looking forward to use this!

@niklasvm
Copy link
Collaborator Author

@adchia what is left before this can be merged. I see there is one test failing however the failures are unrelated to this PR

Copy link
Collaborator

@adchia adchia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm

@feast-ci-bot
Copy link
Collaborator

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: adchia, niklasvm

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@feast-ci-bot feast-ci-bot merged commit a59c33a into feast-dev:master Sep 15, 2022
felixwang9817 pushed a commit that referenced this pull request Sep 20, 2022
# [0.25.0](v0.24.0...v0.25.0) (2022-09-20)

### Bug Fixes

* Broken Feature Service Link ([#3227](#3227)) ([e117082](e117082))
* Feature-server image is missing mysql dependency for mysql registry ([#3223](#3223)) ([ae37b20](ae37b20))
* Fix handling of TTL in Go server ([#3232](#3232)) ([f020630](f020630))
* Fix materialization when running on Spark cluster. ([#3166](#3166)) ([175fd25](175fd25))
* Fix push API to respect feature view's already inferred entity types ([#3172](#3172)) ([7c50ab5](7c50ab5))
* Fix release workflow ([#3144](#3144)) ([20a9dd9](20a9dd9))
* Fix Shopify timestamp bug and add warnings to help with debugging entity registration ([#3191](#3191)) ([de75971](de75971))
* Handle complex Spark data types in SparkSource ([#3154](#3154)) ([5ddb83b](5ddb83b))
* Local staging location provision ([#3195](#3195)) ([cdf0faf](cdf0faf))
* Remove bad snowflake offline store method ([#3204](#3204)) ([dfdd0ca](dfdd0ca))
* Remove opening file object when validating S3 parquet source ([#3217](#3217)) ([a906018](a906018))
* Snowflake config file search error ([#3193](#3193)) ([189afb9](189afb9))
* Update Snowflake Online docs ([#3206](#3206)) ([7bc1dff](7bc1dff))

### Features

* Add `to_remote_storage` functionality to `SparkOfflineStore` ([#3175](#3175)) ([2107ce2](2107ce2))
* Add ability to give boto extra args for registry config ([#3219](#3219)) ([fbc6a2c](fbc6a2c))
* Add health endpoint to py server ([#3202](#3202)) ([43222f2](43222f2))
* Add snowflake support for date & number with scale ([#3148](#3148)) ([50e8755](50e8755))
* Add tag kwarg to set Snowflake online store table path ([#3176](#3176)) ([39aeea3](39aeea3))
* Add workgroup to athena offline store config ([#3139](#3139)) ([a752211](a752211))
* Implement spark materialization engine ([#3184](#3184)) ([a59c33a](a59c33a))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add Spark materialization engine for parallel, distributed materialization of large datasets.
8 participants