Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Spark materialization engine for parallel, distributed materialization of large datasets. #3167

Closed
ckarwicki opened this issue Sep 1, 2022 · 10 comments · Fixed by #3184
Labels
kind/feature New feature or request

Comments

@ckarwicki
Copy link
Contributor

ckarwicki commented Sep 1, 2022

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Current implementation of Spark offline store doesn't have Spark based materialization engine. This makes materialization slow, inefficient and makes Spark offline store not very useful since materialization is still happening in driver node and will be limited by its resources.

Describe the solution you'd like
A clear and concise description of what you want to happen.
Spark based materialization engine.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.
BytewaxMaterializationEngine - it relies on offline_job.to_remote_storage() but SparkRetrievalJob doesn't support to_remote_storage(). Also, would rather use one stack for job execution (preferably Spark) instead of two.

Additional context
Add any other context or screenshots about the feature request here.
spark_materialization_engine would make Feast highly scalable and leverage full Spark potential. Right now it it very limited.

@ckarwicki ckarwicki added the kind/feature New feature or request label Sep 1, 2022
@adchia
Copy link
Collaborator

adchia commented Sep 1, 2022

Hey! Thanks for the FR. Are you in the Slack? There have been recent discussions in the #feast-development Slack about this topic

@niklasvm
Copy link
Collaborator

niklasvm commented Sep 4, 2022

Hi @ckarwicki

I'm taking a look at potential designs for a SparkBatchMaterializationEngine. I've got an open PR that implements the to_remote_storage method for a SparkRetrievalJob in the mean time. I currently see 3 solutions:

(1) The default LocalMaterializationEngine writes the data to the local file system on the driver node and then loops over each parquet file in series. One possible solution is to parallelise this process however the success of this will still be limited by the size of the driver node.

(2) Alternatively, data can be written to remote storage (like S3) and the LambdaMaterializationEngine can be used however this requires the use of AWS Lambda.

(3) I don't know if it's possible to parallelise the code from the LocalMaterializationEngine to run over the executor nodes.

Any thoughts or ideas?

@ckarwicki
Copy link
Contributor Author

ckarwicki commented Sep 5, 2022

@niklasvm We shouldn't rely on parquet files. This should all be done in executors which will make materialization distributed. Initially we can parallelize _materialize_one() after that materialize(). We can start very simple. We can take code from LocalMaterializationEngine and parallelize it in SparkBatchMaterializationEngine. Take out table = offline_job.to_arrow() it is inefficient. Then take code https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/materialization/local_engine.py#L158-L178 and put it in https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.foreachPartition.html#pyspark.sql.DataFrame.foreachPartition. It is recommended to use foreachPartition when writing to external stores.

We basically have SparkRetrievalJob after a call to self.offline_store.pull_latest_from_table_or_query() so we execute that and we get sql.DataFrame which is just df. We call foreachPartition() on that df and put that modified code from LocalMaterializationEngine which will write rows from sql.DataFrame to online store in executors. foreachPartition is transformation so we will need to trigger action in driver, which is fine. With this approach materialization will be handled from executors in parallel, distributed way.

@niklasvm
Copy link
Collaborator

niklasvm commented Sep 5, 2022

Thanks @ckarwicki for the explanation. I have actually gone ahead and implemeted something quite similar in #3184 but instead of using foreachPartition I've used applyInPandas. I'm not sure which is superior.

@ckarwicki
Copy link
Contributor Author

ckarwicki commented Sep 5, 2022

@niklasvm Not sure if we should use applyInPandas It is memory intensive and requires full shuffle: https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.GroupedData.applyInPandas.html

This function requires a full shuffle. All the data of a group will be loaded into memory, so the user should be aware of the potential OOM risk if data is skewed and certain groups are too large to fit in memory.

It works in three steps: Split-apply-combine, check this: https://docs.databricks.com/spark/latest/spark-sql/pandas-function-apis.html#grouped-map

We should use foreachPartition()

@niklasvm
Copy link
Collaborator

niklasvm commented Sep 5, 2022

@niklasvm Not sure if we should use applyInPandas It is memory intensive and requires full shuffle: https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.GroupedData.applyInPandas.html

This function requires a full shuffle. All the data of a group will be loaded into memory, so the user should be aware of the potential OOM risk if data is skewed and certain groups are too large to fit in memory.

It works in three steps: Split-apply-combine, check this: https://docs.databricks.com/spark/latest/spark-sql/pandas-function-apis.html#grouped-map

We should use foreachPartition()

That is very useful, thank you. I'll update the PR.

@ckarwicki
Copy link
Contributor Author

ckarwicki commented Sep 5, 2022

@niklasvm You should also use repartition() on df before calling foreachPartition() where number of partitions should be number of simultaneous connections (writes) to online store. We can call it without arguments - it will create 200 partitions (number taken from spark.sql.shuffle.partitions). It will be that many simultaneous connections (parallel writes) to Redis. Redis default is 10K connections. You can change it on Redis side if you need more.

@niklasvm
Copy link
Collaborator

niklasvm commented Sep 6, 2022

I believe repartition will cause a shuffle operation. Here's the DAG that uses repartition:

image

And without repartition:

image

Is the intention to only repartition to a number that redis can handle? Unfortunately numPartitions is a required parameter of repartition.

Could we use coalesce()?

@ckarwicki
Copy link
Contributor Author

ckarwicki commented Sep 7, 2022

@niklasvm coalesce() should be used when partitioning down. It would be useful to call repartition() since it would allow control on the parallelism of materialization and simultaneous writes. They are saying it should be possible to call it without arguments:

numPartitions : int
can be an int to specify the target number of partitions or a Column. If it is a Column, it will be used as the first partitioning column. If not specified, the default number of partitions is used.

@niklasvm
Copy link
Collaborator

niklasvm commented Sep 8, 2022

Thanks @ckarwicki. I have tested running this without a parameter and it fails.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/feature New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants