Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues with AQE and DPP enabled on Spark 3.2 [databricks] #3691

Merged
merged 4 commits into from
Sep 30, 2021

Conversation

jlowe
Copy link
Member

@jlowe jlowe commented Sep 28, 2021

Fixes #3653.

This fixes a number of issues with GpuBroadcastToCpuExec. In Spark 3.2+, there's an assert to verify that the child of an AdaptiveSparkPlanExec is a BroadcastQueryStageExec if it tries to compute the broadcast. The transition plans would place GpuBroadcastToCpuExec immediately under AdaptiveSparkPlanExec which triggered the assert.

Unfortunately creating a new BroadcastQueryStageExec instance requires shimming, since the signature of BroadcastQueryStageExec has changed across Spark versions. Newer versions require a canonicalized plan parameter, so I needed to add a shim method to create a new instance given an old one and a new child to place underneath.

Besides fixing that assert, there was also a mishandling of the broadcast data in GpuBroadcastToCpuExec. The code assumed that ColumnarBatch.rowIterator would return a unique row per iteration, but it returns the same row object instance for each iteration and mutates that object's internal state to produce the proper row when the row's data is requested. The code was producing an array of InternalRow without manifesting the data from the row, which caused each entry to be the same ColumnarBatchRow instance, pointing at the last row of data. In addition the UnsafeRow instances were being reused in a similar manner. The fix is to not force the internal rows to manifest in an array until they are being converted to an unsafe row, and when converting we need to make a copy of each unsafe row.

One last issue, which ended up being the majority of the change in this PR, is that BroadcastQueryStageExec expects a BroadcastExchangeLike child, so to fix the original assert, GpuBroadcastToCpuExec needs to be a BroadcastExchangeLike. Unfortunately BroadcastExchangeLike was forcing shims due to Databricks changing the signature of that class. Rather than shim yet another class that needs BroadcastExchangeLike, I ended up creating a shim v2 version of BroadcastExchangeLike called ShimBroadcastExchangeLike that encapsulates the differences between Apache Spark and Databricks with this class. This not only allows us to avoid shimming GpuBroadcastToCpuExec but also allows us to unshim GpuBroadcastExchangeExec which was only being shimmed due to this trait signature difference.

Signed-off-by: Jason Lowe <jlowe@nvidia.com>
@jlowe jlowe added this to the Sep 27 - Oct 1 milestone Sep 28, 2021
@jlowe jlowe self-assigned this Sep 28, 2021
@jlowe jlowe changed the title Fix issues with AQE and DPP enabled on Spark 3.2 Fix issues with AQE and DPP enabled on Spark 3.2 [databricks] Sep 28, 2021
@jlowe
Copy link
Member Author

jlowe commented Sep 28, 2021

build

@abellina
Copy link
Collaborator

Looks like a databricks301 build issue:


[2021-09-28T20:35:06.698Z] [ERROR] /home/ubuntu/spark-rapids/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala:69: not enough arguments for method apply: (id: Int, plan: org.apache.spark.sql.execution.SparkPlan, _canonicalized: org.apache.spark.sql.execution.SparkPlan)org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec in object BroadcastQueryStageExec.

[2021-09-28T20:35:06.698Z] Unspecified value parameter _canonicalized.

[2021-09-28T20:35:06.698Z] [ERROR]       newPlan: SparkPlan): BroadcastQueryStageExec = BroadcastQueryStageExec(old.id, newPlan)

[2021-09-28T20:35:06.698Z] [ERROR]                           

@jlowe
Copy link
Member Author

jlowe commented Sep 28, 2021

build

1 similar comment
@jlowe
Copy link
Member Author

jlowe commented Sep 29, 2021

build

@sameerz sameerz added bug Something isn't working Spark 3.2+ labels Sep 29, 2021
tgravescs
tgravescs previously approved these changes Sep 29, 2021
Copy link
Collaborator

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be nice to add an integration test with this combo, I think we may have issues filed for both DPP and AQE testing though too, not sure about together.

@jlowe jlowe marked this pull request as draft September 29, 2021 13:41
@jlowe
Copy link
Member Author

jlowe commented Sep 29, 2021

There are some double-close issues in some cases, converting to draft while I investigate.

@jlowe
Copy link
Member Author

jlowe commented Sep 29, 2021

Found the double-buffer issue, had to do with multiple columns appearing in the same batch all referencing the same buffer.

@jlowe
Copy link
Member Author

jlowe commented Sep 29, 2021

build

@jlowe jlowe marked this pull request as ready for review September 29, 2021 13:57
@jlowe jlowe marked this pull request as draft September 29, 2021 14:10
@jlowe
Copy link
Member Author

jlowe commented Sep 29, 2021

I think I've fixed the problems with HostColumnVector deserialization. Queries seem to be computing proper results, but I'm seeing the CPU and GPU execute queries with very different shapes (e.g.: CPU never runs a stage with more than 35 partitions, yet GPU will run many partitions with the full 200 partitions). Investigating.

@jlowe
Copy link
Member Author

jlowe commented Sep 30, 2021

The occasional lack of AQE shuffle coalescing on GPU queries is unrelated to this change. Filed #3713 to track it separately.

@jlowe jlowe marked this pull request as ready for review September 30, 2021 00:31
@jlowe
Copy link
Member Author

jlowe commented Sep 30, 2021

build

val unsafeRows = rows.iterator.map(toUnsafe)
val relation = ShimLoader.getSparkShims
val unsafeRows = gpuBatches.flatMap {
_.rowIterator().asScala.map(r => toUnsafe(r).copy())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, it may be worth adding a comment above this explaining the reason for the copy. I think folks can backtrack to this github issue, and it can be done later too, not to block this PR from an expensive CI run today.

Copy link
Collaborator

@abellina abellina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@abellina
Copy link
Collaborator

Note I am running with this change in spark2a, and I haven't found the NPE I was seeing before.

@tgravescs tgravescs merged commit 85438b9 into NVIDIA:branch-21.10 Sep 30, 2021
tgravescs added a commit that referenced this pull request Oct 1, 2021
* Fix issues with AQE and DPP enabled on Spark 3.2 [databricks] (#3691)

* Fix issues with AQE and DPP enabled on Spark 3.2

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Add canonicalized parameter for 301db shim

* Fix double-close when batch contains multiple columns

* Fix HostColumnVector deserialization

* CDH build stopped working due to missing jars in maven repo (#3722)

fixes #3718

Evidently some jars were removed from the cdh maven repo that were pulled in through spark-hive -> spark-core -> curator-recipes.  We don't use that version as its explicitly called out in the cdh profiles.  Just exclude spark-core when pulling in spark-hive dep.  Built and unit tests pass.

I did see a couple of other dependency warnings but then didn't see them again. I'll rerun with clean m2 but that shouldn't block this to fix the build.

For reference the error was:
`Could not resolve dependencies for project com.nvidia:rapids-4-spark-sql_2.12:jar:21.10.0-SNAPSHOT: Failed to collect dependencies at org.apache.spark:spark-hive_2.12:jar:3.1.1.3.1.7270.0-253 -> org.apache.spark:spark-core_2.12:jar:3.1.1.3.1.7270.0-253 -> org.apache.curator:curator-recipes:jar:4.3.0.7.2.7.0-SNAPSHOT: Failed to read artifact descriptor for org.apache.curator:curator-recipes:jar:4.3.0.7.2.7.0-SNAPSHOT: Could not transfer artifact org.apache.curator:curator-recipes:pom:4.3.0.7.2.7.0-SNAPSHOT from/to cloudera (https://repo.hortonworks.com/nexus/content/groups/public): PKIX path building failed:`

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

Co-authored-by: Thomas Graves <tgraves@nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Spark 3.2+
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] Issue seen with AQE on in Q5 (possibly others) using Spark 3.2 rc3
4 participants