-
Notifications
You must be signed in to change notification settings - Fork 237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Insert buffer converters for TypedImperativeAggregate #3299
Insert buffer converters for TypedImperativeAggregate #3299
Conversation
Signed-off-by: sperlingxx <lovedreamf@gmail.com>
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a little concerned that this code appears to assume AQE is on all the time. It also looks like we cannotl have a TypeImerativeAggregation that does not use transitions. For percentile approximate it is going to use a very different algorithm and only supporting transitions is going to be very hard. I would prefer to keep the old code if we have to pick just one way to do it.
shims/spark304/src/main/scala/com/nvidia/spark/rapids/shims/spark304/SparkBaseShims.scala
Outdated
Show resolved
Hide resolved
shims/spark304/src/main/scala/com/nvidia/spark/rapids/shims/spark304/SparkBaseShims.scala
Outdated
Show resolved
Hide resolved
shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala
Outdated
Show resolved
Hide resolved
shims/spark311cdh/src/main/scala/com/nvidia/spark/rapids/shims/spark311cdh/SparkBaseShims.scala
Outdated
Show resolved
Hide resolved
shims/spark313/src/main/scala/com/nvidia/spark/rapids/shims/spark313/SparkBaseShims.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala
Show resolved
Hide resolved
build |
I brought back the old codes for "associated fallback". After that, we will judge whether all TypedImperativeAggregate buffers across CPU and GPU are available to be converted in runtime. If so, we insert buffer converters. Otherwise, we just fall back the entire Aggregation stack. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks good. I just have a few nits, but I am happy to let it in without any changes
.../spark311cdh/src/main/scala/com/nvidia/spark/rapids/shims/spark311cdh/Spark311CDHShims.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala
Outdated
Show resolved
Hide resolved
build |
it looks like this is missing the databricks shim updates for GpuColumnarToRowTransitionExec. If you are touching all the shims please think about the databricks ones as well. |
I'll put up pr to fix |
Sorry it looks like this may have just been in my branch with various build changes, apologize |
Current PR addresses the last task of #2916, supporting the aggregation buffer conversion between CPU and GPU format for TypedImperativeAggregate functions. With the ability to insert buffer converters, we can handle TypedImperativeAggregate functions running across CPU and CPU in runtime. It indicates that we don't need to fallback the entire Aggregate stack to CPU once one stage need to fallback when the Aggregate contains TypedImperativeAggregate functions.
The general idea is to create buffer converters and bind them to certain physical plans during
preColumnarTransitions
. And integrate these buffer converters into RowToColumnar/ColumnarToRow transitions as pre-processing/post-processing projections duringpostColumnarTransitions
. And this idea works even when AQE is on. To adapt AQE, we leverage TreeNodeTag to cache temporary information, including: buffer converters, and some meta data.For better understanding, let's walk through the entire procedure of inserting buffer converters:
1.1 collecting all stages of Aggregation which contains TypedImperativeAggregate functions
The binding procedure is triggered in
GpuTypedImperativeSupportedAggregateExecMeta.tagPlanForGpu
if wrapped planis the final stage. At first, we collect all stages as what we do for associated fallback.
1.2 filtering stages who need buffer converters to fill the data gap with their child
1.3 creating buffer converters with filtered stages
We add two new interfaces on GpuTypedImperativeSupportedAggregateExecMeta:
createCpuToGpuBufferConverter
andcreateGpuToCpuBufferConverter
.1.4 binding buffer converters into certain plans
The plans who carry the buffer converters are the CPU plans (can not be replaced) located right before/after the potential R2C/C2R transitions.
We add extra field preTransitions/postTransitions to GpuRowToColumnar/GpuColumnarToRow, in order to insert projections like buffer converters for TypedImperativeAggregate.