Skip to content

Commit

Permalink
Add in running window optimization using scan (#2895)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Jul 13, 2021
1 parent 471746a commit 31873a0
Show file tree
Hide file tree
Showing 5 changed files with 569 additions and 44 deletions.
100 changes: 87 additions & 13 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,9 @@ def test_window_aggs_for_ranges_numeric_long_overflow(data_gen):
' range between 9223372036854775807 preceding and 9223372036854775807 following) as sum_c_asc, '
'from window_agg_table')


@ignore_order
# In a distributed setup the order of the partitions returend might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('data_gen', [
_grpkey_byte_with_nulls,
Expand Down Expand Up @@ -246,7 +247,9 @@ def test_window_aggs_for_range_numeric_date(data_gen, batch_size):
'from window_agg_table ',
conf = conf)

@ignore_order
# In a distributed setup the order of the partitions returend might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('data_gen', [_grpkey_longs_with_no_nulls,
_grpkey_longs_with_nulls,
Expand Down Expand Up @@ -281,10 +284,8 @@ def test_window_aggs_for_rows(data_gen, batch_size):


# This is for aggregations that work with a running window optimization. They don't need to be batched
# specially, but it only works if all of the aggregations can support this. Right now this is just
# row number, but will expand to others in the future (rank and dense_rank).
@ignore_order
@approximate_float
# specially, but it only works if all of the aggregations can support this.
# the order returned should be consistent because the data ends up in a single task (no partitioning)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('b_gen', all_basic_gens_no_nans + [decimal_gen_scale_precision], ids=meta_idfn('data:'))
def test_window_running_no_part(b_gen, batch_size):
Expand All @@ -295,6 +296,9 @@ def test_window_running_no_part(b_gen, batch_size):
'count(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as count_col',
'min(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as min_col',
'max(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as max_col']
if isinstance(b_gen.data_type, NumericType) and not isinstance(b_gen, FloatGen) and not isinstance(b_gen, DoubleGen):
query_parts.append('sum(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as sum_col')

assert_gpu_and_cpu_are_equal_sql(
lambda spark : two_col_df(spark, LongRangeGen(), b_gen, length=1024 * 14),
"window_agg_table",
Expand All @@ -304,21 +308,55 @@ def test_window_running_no_part(b_gen, batch_size):
validate_execs_in_gpu_plan = ['GpuRunningWindowExec'],
conf = conf)

# Test that we can do a running window sum on floats and doubles. This becomes problematic because we do the agg in parallel
# which means that the result can switch back and forth from Inf to not Inf depending on the order of aggregations.
# We test this by limiting the range of the values in the sum to never hit Inf, and by using abs so we don't have
# positive and negative values that interfere with each other.
# the order returned should be consistent because the data ends up in a single task (no partitioning)
@approximate_float
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
def test_running_float_sum_no_part(batch_size):
conf = {'spark.rapids.sql.batchSizeBytes': batch_size,
'spark.rapids.sql.variableFloatAgg.enabled': True,
'spark.rapids.sql.castFloatToDecimal.enabled': True}
query_parts = ['a',
'sum(cast(b as double)) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as shrt_dbl_sum',
'sum(abs(dbl)) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dbl_sum',
'sum(cast(b as float)) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as shrt_flt_sum',
'sum(abs(flt)) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as flt_sum']

gen = StructGen([('a', LongRangeGen()),('b', short_gen),('flt', float_gen),('dbl', double_gen)], nullable=False)
assert_gpu_and_cpu_are_equal_sql(
lambda spark : gen_df(spark, gen, length=1024 * 14),
"window_agg_table",
'select ' +
', '.join(query_parts) +
' from window_agg_table ',
validate_execs_in_gpu_plan = ['GpuRunningWindowExec'],
conf = conf)

# This is for aggregations that work with a running window optimization. They don't need to be batched
# specially, but it only works if all of the aggregations can support this. Right now this is just
# row number, but will expand to others in the future (rank and dense_rank).
@ignore_order
# specially, but it only works if all of the aggregations can support this.
# In a distributed setup the order of the partitions returned might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('b_gen, c_gen', [(long_gen, x) for x in running_part_and_order_gens] +
[(x, long_gen) for x in all_basic_gens_no_nans + [decimal_gen_scale_precision]], ids=idfn)
def test_window_running(b_gen, c_gen, batch_size):
conf = {'spark.rapids.sql.batchSizeBytes': batch_size,
'spark.rapids.sql.hasNans': False,
'spark.rapids.sql.variableFloatAgg.enabled': True,
'spark.rapids.sql.castFloatToDecimal.enabled': True}
query_parts = ['row_number() over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as row_num',
'count(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as count_col',
'min(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as min_col',
'max(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as max_col']

# Decimal precision can grow too large. Float and Double can get odd results for Inf/-Inf because of ordering
if isinstance(c_gen.data_type, NumericType) and (not isinstance(c_gen, FloatGen)) and (not isinstance(c_gen, DoubleGen)) and (not isinstance(c_gen, DecimalGen)):
query_parts.append('sum(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as sum_col')

assert_gpu_and_cpu_are_equal_sql(
lambda spark : three_col_df(spark, LongRangeGen(), RepeatSeqGen(b_gen, length=100), c_gen, length=1024 * 14),
"window_agg_table",
Expand All @@ -328,8 +366,40 @@ def test_window_running(b_gen, c_gen, batch_size):
validate_execs_in_gpu_plan = ['GpuRunningWindowExec'],
conf = conf)

# Test that we can do a running window sum on floats and doubles and decimal. This becomes problematic because we do the agg in parallel
# which means that the result can switch back and forth from Inf to not Inf depending on the order of aggregations.
# We test this by limiting the range of the values in the sum to never hit Inf, and by using abs so we don't have
# positive and negative values that interfere with each other.
# decimal is problematic if the precision is so high it falls back to the CPU.
# In a distributed setup the order of the partitions returned might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
def test_window_running_float_decimal_sum(batch_size):
conf = {'spark.rapids.sql.batchSizeBytes': batch_size,
'spark.rapids.sql.variableFloatAgg.enabled': True,
'spark.rapids.sql.castFloatToDecimal.enabled': True}
# TODO need a way to insert NaNs...
query_parts = ['b', 'a',
'sum(cast(c as double)) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dbl_sum',
'sum(abs(dbl)) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dbl_sum',
'sum(cast(c as float)) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as flt_sum',
'sum(abs(flt)) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as flt_sum',
'sum(cast(c as Decimal(6,1))) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dec_sum']

gen = StructGen([('a', LongRangeGen()),('b', RepeatSeqGen(int_gen, length=1000)),('c', short_gen),('flt', float_gen),('dbl', double_gen)], nullable=False)
assert_gpu_and_cpu_are_equal_sql(
lambda spark : gen_df(spark, gen, length=1024 * 14),
"window_agg_table",
'select ' +
', '.join(query_parts) +
' from window_agg_table ',
validate_execs_in_gpu_plan = ['GpuRunningWindowExec'],
conf = conf)

@ignore_order
# In a distributed setup the order of the partitions returned might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
@approximate_float
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('c_gen', lead_lag_data_gens, ids=idfn)
Expand Down Expand Up @@ -454,7 +524,9 @@ def test_window_aggs_for_rows_lead_lag_on_arrays(a_gen, b_gen, c_gen, d_gen, bat

# lead and lag don't currently work for string columns, so redo the tests, but just for strings
# without lead and lag
@ignore_order
# In a distributed setup the order of the partitions returned might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
@approximate_float
@pytest.mark.parametrize('c_gen', [string_gen], ids=idfn)
@pytest.mark.parametrize('a_b_gen', part_and_order_gens, ids=meta_idfn('partAndOrderBy:'))
Expand Down Expand Up @@ -483,7 +555,9 @@ def do_it(spark):


# Test for RANGE queries, with timestamp order-by expressions.
@ignore_order
# In a distributed setup the order of the partitions returned might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [_grpkey_longs_with_timestamps,
pytest.param(_grpkey_longs_with_nullable_timestamps)],
ids=idfn)
Expand Down
Loading

0 comments on commit 31873a0

Please sign in to comment.