Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33298][CORE] Introduce new API to FileCommitProtocol allow flexible file naming #33012

Closed
wants to merge 4 commits into from

Conversation

c21
Copy link
Contributor

@c21 c21 commented Jun 22, 2021

What changes were proposed in this pull request?

This PR is to introduce a new sets of APIs newTaskTempFile and newTaskTempFileAbsPath inside FileCommitProtocol, to allow more flexible file naming of Spark output. The major change is to pass FileNameSpec into FileCommitProtocol, instead of original ext (currently having prefix and ext), to allow individual FileCommitProtocol implementation comes up with more flexible file names (e.g. has a custom prefix) for Hive/Presto bucketing - #30003. Provide a default implementations of the added APIs, so all existing implementation of FileCommitProtocol is NOT being broken.

Why are the changes needed?

To make commit protocol more flexible in terms of Spark output file name.
Pre-requisite of #30003.

Does this PR introduce any user-facing change?

Yes for developers who implement/run custom implementation of FileCommitProtocol. They can choose to implement for the newly added API.

How was this patch tested?

Existing unit tests as this is just adding an API.

@c21
Copy link
Contributor Author

c21 commented Jun 22, 2021

cc @cloud-fan could you help take a look once you have time? Thanks.

@github-actions github-actions bot added the CORE label Jun 22, 2021
@c21 c21 changed the title Introduce new API to FileCommitProtocol allow flexible file naming [SPARK-33298][CORE] Introduce new API to FileCommitProtocol allow flexible file naming Jun 22, 2021
@SparkQA
Copy link

SparkQA commented Jun 22, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44646/

@SparkQA
Copy link

SparkQA commented Jun 22, 2021

Test build #140119 has finished for PR 33012 at commit 8cc4899.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • final case class FileNameSpec(prefix: String, ext: String)

* partitioning. The "spec" parameter specifies the file name. The rest are left to the commit
* protocol implementation to decide.
*
* Important: it is the caller's responsibility to add uniquely identifying content to "spec"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not true, commit protocol needs to guarantee name uniqueness, as caller side only gives prefix and suffix

Copy link
Contributor Author

@c21 c21 Jun 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole sentence is:

it is the caller's responsibility to add uniquely identifying content to "spec"
if a task is going to write out multiple files to the same dir.

I think this refers to the case when in one task, caller calls newTaskTempFile() multiple times with same spec, the caller should not expect commit protocol returning unique different file path every time. The current implementation of HadoopMapReduceCommitProtocol would return same path if ext being same. This is a copy-paste from original comment of newTaskTempFile.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see!

* @param prefix Prefix of file.
* @param ext Extension of file.
*/
final case class FileNameSpec(prefix: String, ext: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we rename ext to suffix? it reality it includes more than extension, such as file count.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - agree, updated.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @c21 and @cloud-fan .
Why do we need to deprecate the old APIs? Apache Spark community doesn't not delete the deprecated APIs in general. For the deprecation at Apache Spark 3.2.0, I believe we need another discussion in the community mailing list. Could you remove the deprecation part from this PR?

cc @gengliangwang

@dongjoon-hyun
Copy link
Member

Also, cc @sunchao because this PR is about bucketing.

@cloud-fan
Copy link
Contributor

It's not a public API (under package org.apache.spark.internal) and we are conservative here simply because we know some third party libraries are using it, e.g. https://github.com/steveloughran/zero-rename-committer

The old API needs to be deprecated as it's not fully functional now. To support hive bucket table, Spark needs to ask the commit protocol to add a certain prefix in the file name, and the old API can never know this prefix requirement.

@c21
Copy link
Contributor Author

c21 commented Jun 22, 2021

@dongjoon-hyun - this is not a public API. Here with this PR, any existing third parties library will not be broken. The new API is superset of existing API, and any third parties can move forward to implement the new API in the future, but they can also stay on the old API if they need to.

@SparkQA
Copy link

SparkQA commented Jun 23, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44695/

@SparkQA
Copy link

SparkQA commented Jun 23, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44697/

@SparkQA
Copy link

SparkQA commented Jun 23, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44695/

@SparkQA
Copy link

SparkQA commented Jun 23, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44697/

@SparkQA
Copy link

SparkQA commented Jun 23, 2021

Test build #140170 has finished for PR 33012 at commit b24f40d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Jun 23, 2021

Ya, I know this is internal, but are we able to remove that API in the future at Apache Spark 3.3.0?
Why don't we remove deprecation warnings from this PR?
After we file a JIRA and discuss in email thread, we can add them later.

@gengliangwang
Copy link
Member

gengliangwang commented Jun 23, 2021

Hmmm, shall we simply add

def newTaskTempFile(
      taskContext: TaskAttemptContext, dir: Option[String], prefix: String,ext: String): String
def newTaskTempFileAbsPath(
      taskContext: TaskAttemptContext, absoluteDir: String, prefix: String,ext: String): String

For the deprecation, I think either way is fine. We can simply remove them and wait for the discussion conclusion as @dongjoon-hyun mentioned.

@cloud-fan
Copy link
Contributor

OK since it's a semi-developer API maybe we don't need to deprecate. We can have a discussion in the dev list if we need to remove the old APIs one day.

@cloud-fan
Copy link
Contributor

taskContext: TaskAttemptContext, dir: Option[String], prefix: String,ext: String

then we need to break the API again if we need to customize the file name more in the future.

@gengliangwang
Copy link
Member

@cloud-fan It's just two functions for the temp file/path creation. Introducing a new class for this seems over-designed.

@cloud-fan
Copy link
Contributor

API design needs personal taste, I'll leave the decision to @c21

@c21
Copy link
Contributor Author

c21 commented Jun 23, 2021

taskContext: TaskAttemptContext, dir: Option[String], prefix: String,ext: String

@gengliangwang - I agree with @cloud-fan. The FileNameSpec class is introduced mainly for future-proof. Whenever in the future we need pass more parameters to customize file name (e.g. require randomness/UUID, etc), we can just add more field into FileNameSpec. We don't need to break the API again. Future-proof is the main purpose here other than encapsulation. How about keeping FileNameSpec as it is?

Ya, I know this is internal, but are we able to remove that API in the future at Apache Spark 3.3.0?
Why don't we remove deprecation warnings from this PR?
After we file a JIRA and discuss in email thread, we can add them later.

@dongjoon-hyun - based on people's opinions here, I removed the deprecated annotations of existing APIs. I agree with your plan. I will have a discussion in mailing list for Spark 3.3 later, and we can add deprecated later or just remove the existing APIs. Thanks.

@SparkQA
Copy link

SparkQA commented Jun 23, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44746/

@SparkQA
Copy link

SparkQA commented Jun 23, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44746/

@SparkQA
Copy link

SparkQA commented Jun 23, 2021

Test build #140218 has finished for PR 33012 at commit 2ec6fc6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Jun 24, 2021

@dongjoon-hyun - sorry for pinging again as it closes to branch cut. Could you help take a look again? Thanks.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @c21 , @cloud-fan , @gengliangwang .

@c21
Copy link
Contributor Author

c21 commented Jun 25, 2021

Thank you all for review!

@c21 c21 deleted the commit-protocol-api branch June 25, 2021 00:36
*
* This API should be implemented and called, instead of
* [[newTaskTempFile(taskContest, dir, ext)]]. Provide a default implementation here to be
* backward compatible with custom [[FileCommitProtocol]] implementations before Spark 3.2.0.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should mark this as an API (e.g., @Unstable) ... this is currently internal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit weird to say about "backward compatible" here. If this isn't an API, we should explicitly mention that this isn't an API at least here to avoid giving a false impression to dev.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon - will it look better as below?

This method should be implemented and called, instead of 
[[newTaskTempFile(taskContest, dir, ext)]]. Provide a default implementation here to be 
compatible with custom [[FileCommitProtocol]] implementations before Spark 3.2.0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we mark it as @Unstable for now with explaining the context a bit? e.g.) this class is exposed as an API considering the usage of many downstream custom implementations but will be subject to be changed and/or moved.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @cloud-fan to get more feedback

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine to mark it Unstable. It doesn't hurt anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Let me create a PR now.

HyukjinKwon pushed a commit that referenced this pull request Jun 30, 2021
…rotocol`

### What changes were proposed in this pull request?

This is the followup from #33012 (comment), where we want to add `Unstable` to `FileCommitProtocol`, to give people a better idea of API.

### Why are the changes needed?

Make it easier for people to follow and understand code. Clean up code.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit tests, as no real logic change.

Closes #33148 from c21/bucket-followup.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
cloud-fan pushed a commit that referenced this pull request Sep 17, 2021
…ormat with Hive hash)

### What changes were proposed in this pull request?

This is a re-work of #30003, here we add support for writing Hive bucketed table with Parquet/ORC file format (data source v1 write path and Hive hash as the hash function). Support for Hive's other file format will be added in follow up PR.

The changes are mostly on:

* `HiveMetastoreCatalog.scala`: When converting hive table relation to data source relation, pass bucket info (BucketSpec) and other hive related info as options into `HadoopFsRelation` and `LogicalRelation`, which can be later accessed by `FileFormatWriter` to customize bucket id and file name.

* `FileFormatWriter.scala`: Use `HiveHash` for `bucketIdExpression` if it's writing to Hive bucketed table. In addition, Spark output file name should follow Hive/Presto/Trino bucketed file naming convention. Introduce another parameter `bucketFileNamePrefix` and it introduces subsequent change in `FileFormatDataWriter`.

* `HadoopMapReduceCommitProtocol`: Implement the new file name APIs introduced in #33012, and change its sub-class `PathOutputCommitProtocol`, to make Hive bucketed table writing work with all commit protocol (including S3A commit protocol).

### Why are the changes needed?

To make Spark write other-SQL-engines-compatible bucketed table. Currently Spark bucketed table cannot be leveraged by other SQL engines like Hive and Presto, because it uses a different hash function (Spark murmur3hash) and different file name scheme. With this PR, the Spark-written-Hive-bucketed-table can be efficiently read by Presto and Hive to do bucket filter pruning, join, group-by, etc. This was and is blocking several companies (confirmed from Facebook, Lyft, etc) migrate bucketing workload from Hive to Spark.

### Does this PR introduce _any_ user-facing change?

Yes, any Hive bucketed table (with Parquet/ORC format) written by Spark, is properly bucketed and can be efficiently processed by Hive and Presto/Trino.

### How was this patch tested?

* Added unit test in BucketedWriteWithHiveSupportSuite.scala, to verify bucket file names and each row in each bucket is written properly.
* Tested by Lyft Spark team (Shashank Pedamallu) to read Spark-written bucketed table from Trino, Spark and Hive.

Closes #33432 from c21/hive-bucket-v1.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
catalinii pushed a commit to lyft/spark that referenced this pull request Oct 8, 2021
…ormat with Hive hash)

### What changes were proposed in this pull request?

This is a re-work of apache#30003, here we add support for writing Hive bucketed table with Parquet/ORC file format (data source v1 write path and Hive hash as the hash function). Support for Hive's other file format will be added in follow up PR.

The changes are mostly on:

* `HiveMetastoreCatalog.scala`: When converting hive table relation to data source relation, pass bucket info (BucketSpec) and other hive related info as options into `HadoopFsRelation` and `LogicalRelation`, which can be later accessed by `FileFormatWriter` to customize bucket id and file name.

* `FileFormatWriter.scala`: Use `HiveHash` for `bucketIdExpression` if it's writing to Hive bucketed table. In addition, Spark output file name should follow Hive/Presto/Trino bucketed file naming convention. Introduce another parameter `bucketFileNamePrefix` and it introduces subsequent change in `FileFormatDataWriter`.

* `HadoopMapReduceCommitProtocol`: Implement the new file name APIs introduced in apache#33012, and change its sub-class `PathOutputCommitProtocol`, to make Hive bucketed table writing work with all commit protocol (including S3A commit protocol).

### Why are the changes needed?

To make Spark write other-SQL-engines-compatible bucketed table. Currently Spark bucketed table cannot be leveraged by other SQL engines like Hive and Presto, because it uses a different hash function (Spark murmur3hash) and different file name scheme. With this PR, the Spark-written-Hive-bucketed-table can be efficiently read by Presto and Hive to do bucket filter pruning, join, group-by, etc. This was and is blocking several companies (confirmed from Facebook, Lyft, etc) migrate bucketing workload from Hive to Spark.

### Does this PR introduce _any_ user-facing change?

Yes, any Hive bucketed table (with Parquet/ORC format) written by Spark, is properly bucketed and can be efficiently processed by Hive and Presto/Trino.

### How was this patch tested?

* Added unit test in BucketedWriteWithHiveSupportSuite.scala, to verify bucket file names and each row in each bucket is written properly.
* Tested by Lyft Spark team (Shashank Pedamallu) to read Spark-written bucketed table from Trino, Spark and Hive.

Closes apache#33432 from c21/hive-bucket-v1.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants