Skip to content

Commit

Permalink
Split window exec into multiple stages if needed (#2845)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Jun 30, 2021
1 parent 79f02b7 commit 628a9ce
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 67 deletions.
8 changes: 7 additions & 1 deletion integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,20 +401,26 @@ def assert_gpu_and_cpu_row_counts_equal(func, conf={}, is_cpu_first=True):
"""
_assert_gpu_and_cpu_are_equal(func, 'COUNT', conf=conf, is_cpu_first=is_cpu_first)

def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None, debug=False, is_cpu_first=True):
def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None, debug=False, is_cpu_first=True, validate_execs_in_gpu_plan=[]):
"""
Assert that the specified SQL query produces equal results on CPU and GPU.
:param df_fun: a function that will create the dataframe
:param table_name: Name of table to be created with the dataframe
:param sql: SQL query to be run on the specified table
:param conf: Any user-specified confs. Empty by default.
:param debug: Boolean to indicate if the SQL output should be printed
:param is_cpu_first: Boolean to indicate if the CPU should be run first or not
:param validate_execs_in_gpu_plan: String list of expressions to be validated in the GPU plan.
:return: Assertion failure, if results from CPU and GPU do not match.
"""
if conf is None:
conf = {}
def do_it_all(spark):
df = df_fun(spark)
df.createOrReplaceTempView(table_name)
# we hold off on setting the validate execs until after creating the temp view

spark.conf.set('spark.rapids.sql.test.validateExecsInGpuPlan', ','.join(validate_execs_in_gpu_plan))
if debug:
return data_gen.debug_df(spark.sql(sql))
else:
Expand Down
5 changes: 4 additions & 1 deletion integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ def test_window_running_no_part(b_gen, batch_size):
'select ' +
', '.join(query_parts) +
' from window_agg_table ',
validate_execs_in_gpu_plan = ['GpuRunningWindowExec'],
conf = conf)

# This is for aggregations that work with a running window optimization. They don't need to be batched
Expand All @@ -319,6 +320,7 @@ def test_window_running(b_gen, c_gen, batch_size):
'select ' +
', '.join(query_parts) +
' from window_agg_table ',
validate_execs_in_gpu_plan = ['GpuRunningWindowExec'],
conf = conf)

@ignore_order
Expand Down Expand Up @@ -527,6 +529,8 @@ def test_window_aggs_for_rows_collect_list():

# SortExec does not support array type, so sort the result locally.
@ignore_order(local=True)
# This test is more directed at Databricks and their running window optimization instead of ours
# this is why we do not validate that we inserted in a GpuRunningWindowExec, yet.
def test_running_window_function_exec_for_all_aggs():
assert_gpu_and_cpu_are_equal_sql(
lambda spark : gen_df(spark, _gen_data_for_collect_list),
Expand Down Expand Up @@ -554,7 +558,6 @@ def test_running_window_function_exec_for_all_aggs():
from window_collect_table
''')


# Generates some repeated values to test the deduplication of GpuCollectSet.
# And GpuCollectSet does not yet support struct type.
_gen_data_for_collect_set = [
Expand Down
Loading

0 comments on commit 628a9ce

Please sign in to comment.