Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add leafNodeDefaultParallelism support #3408

Merged
merged 3 commits into from
Sep 9, 2021

Conversation

wbo4958
Copy link
Collaborator

@wbo4958 wbo4958 commented Sep 8, 2021

This PR is to fix #1925 by add leafNodeDefaultParallelism to the shims layer.

Signed-off-by: Bobby Wang <wbo4958@gmail.com>
@wbo4958
Copy link
Collaborator Author

wbo4958 commented Sep 8, 2021

build

@@ -378,7 +378,8 @@ case class GpuRangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range
val start: Long = range.start
val end: Long = range.end
val step: Long = range.step
val numSlices: Int = range.numSlices.getOrElse(sparkContext.defaultParallelism)
val numSlices: Int = range.numSlices.getOrElse(ShimLoader.getSparkShims
.leafNodeDefaultParallelism(ShimLoader.getSparkShims.sessionFromPlan(this)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have an inherited function sparkSession from GpuExec so we can just write leafNodeDefaultParallelism(sparkSession)

but I think we can get away without a Shim if we simply spell out what https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L781 does. I think it's ok if we "accidentally" understand the 3.2.0+ property in prior Spark versions.

  val numSlices: Int = range.numSlices.getOrElse(
    sparkSession.conf.getOption("spark.sql.leafNodeDefaultParallelism").map(_.toInt)
      .getOrElse(sparkSession.sparkContext.defaultParallelism)
  )

Copy link
Collaborator Author

@wbo4958 wbo4958 Sep 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if users have set "spark.sql.leafNodeDefaultParallelism" in the prior spark 3.2 version? Then it will have a different behavior between GPU and CPU. What's your opinion? @gerashegalov

Copy link
Collaborator

@gerashegalov gerashegalov Sep 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason I thought it's OK to have process spark.sql.leafNodeDefaultParallelism even for earlier versions is because

  • it does not change query results logically
  • the behavior is easily tweak-able by config

In this particular case if we wanted to be very rigorous with regard to this we can call
ShimLoader.getSparkShims.getSparkShimVersion and check major and minor accordingly and still avoid introducing extra shim changes. IMO it's not necessary.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it does not change query results logically. But it will change the number of output files, I think we should keep rigorous as spark itself. Just like @revans2 said, user tends to write a script with this configuration to run their queries against different spark versions.

And I'd like to add this in the shim layer, Since getSparkShimVersion will match different versions like (SparkShimVersion/ClouderaShimVersion ...) to get the major/minor number. But what if someone adds a new ShimVersion and forgets to update here. @gerashegalov what's your opinion?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, the user will rather appreciate seeing consistency between Spark versions in the scenario.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @gerashegalov, You have not convinced me. I'd like to merge this PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, at least I tried :)

having consistent behavior used to be a strong argument

@wbo4958
Copy link
Collaborator Author

wbo4958 commented Sep 8, 2021

build

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me this looks fine. If @gerashegalov convinces you to not do it in the shim I am okay with that too. I don't think it is going to be common for the config in question to be set on older versions of spark. But I can see it happen, especially if someone is switching back and forth trying to test something between different versions.

@sameerz sameerz added this to the Aug 30 - Sept 10 milestone Sep 9, 2021
@wbo4958 wbo4958 merged commit 9742ea7 into NVIDIA:branch-21.10 Sep 9, 2021
@wbo4958 wbo4958 deleted the leafNodeParallism branch September 9, 2021 21:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] Add default parallelism configuration for Spark SQL queries
4 participants