Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in running window optimization using scan #2895

Merged
merged 4 commits into from
Jul 13, 2021

Conversation

revans2
Copy link
Collaborator

@revans2 revans2 commented Jul 9, 2021

Spark optimizes running windows to have a linear time algorithm. When discussing this with cudf rapidsai/cudf#8440 it was decided to use scan and segmented_scan (group by scan). This puts in a framework for this and adds in a few initial implementations.

In performance tests on my local box row_number and count are only slightly faster now than they were under window, but min, max, and sum all show significant performance gains similar to those I showed were possible in rapidsai/cudf#8440

In a large max running window with no partition by I have seen performance improvements of 171x faster cold and 542x faster hot compared to the CPU

scala> spark.time(spark.range(0L, Int.MaxValue, 1L, numPartitions=1).select(max("id").over(Window.orderBy("id").rowsBetween(Window.unboundedPreceding, 0)).as("RN"), col("id")).orderBy(desc("RN")).show)
...
Time taken: 3633 ms
scala> spark.time(spark.range(0L, Int.MaxValue, 1L, numPartitions=1).select(max("id").over(Window.orderBy("id").rowsBetween(Window.unboundedPreceding, 0)).as("RN"), col("id")).orderBy(desc("RN")).show)
...
Time taken: 1153 ms
scala> spark.conf.set("spark.rapids.sql.enabled", "false")
scala> spark.time(spark.range(0L, Int.MaxValue, 1L, numPartitions=1).select(max("id").over(Window.orderBy("id").rowsBetween(Window.unboundedPreceding, 0)).as("RN"), col("id")).orderBy(desc("RN")).show)
...
Time taken: 622094 ms
scala> spark.conf.set("spark.rapids.sql.enabled", "false")
scala> spark.time(spark.range(0L, Int.MaxValue, 1L, numPartitions=1).select(max("id").over(Window.orderBy("id").rowsBetween(Window.unboundedPreceding, 0)).as("RN"), col("id")).orderBy(desc("RN")).show)
...
Time taken: 625660 ms

This is a special case because when no partition is given the data all goes to a single task, so it needs a single core to process the data. But as you can see it would still take hundreds of CPU cores in the partitioned cast to offset the performance gains.

On the previous GPU code I could not run this because I had to kill it before my GPU overheated.

This is stepping stone to be able to support rank and dense_rank

Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
@revans2 revans2 added the performance A performance related task/issue label Jul 9, 2021
@revans2 revans2 added this to the July 5 - July 16 milestone Jul 9, 2021
@revans2 revans2 self-assigned this Jul 9, 2021
@revans2
Copy link
Collaborator Author

revans2 commented Jul 9, 2021

I tested this on databricks and it works there too.

@revans2
Copy link
Collaborator Author

revans2 commented Jul 9, 2021

build

@revans2
Copy link
Collaborator Author

revans2 commented Jul 12, 2021

With the review work I accidentally checked in a change that makes this require the fix from rapidsai/cudf#8705. I am inclined to wait for it to get merged in, but if others want to merge this in sooner I can revert the small change and do a follow on PR when the cudf change does get merged in.

@revans2
Copy link
Collaborator Author

revans2 commented Jul 13, 2021

build

@revans2
Copy link
Collaborator Author

revans2 commented Jul 13, 2021

build

@revans2 revans2 merged commit 31873a0 into NVIDIA:branch-21.08 Jul 13, 2021
@revans2 revans2 deleted the window_scan branch July 13, 2021 18:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance A performance related task/issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants