Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in running window optimization using scan #2895

Merged
merged 4 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 87 additions & 13 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,9 @@ def test_window_aggs_for_ranges_numeric_long_overflow(data_gen):
' range between 9223372036854775807 preceding and 9223372036854775807 following) as sum_c_asc, '
'from window_agg_table')


@ignore_order
# In a distributed setup the order of the partitions returend might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('data_gen', [
_grpkey_byte_with_nulls,
Expand Down Expand Up @@ -246,7 +247,9 @@ def test_window_aggs_for_range_numeric_date(data_gen, batch_size):
'from window_agg_table ',
conf = conf)

@ignore_order
# In a distributed setup the order of the partitions returend might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('data_gen', [_grpkey_longs_with_no_nulls,
_grpkey_longs_with_nulls,
Expand Down Expand Up @@ -281,10 +284,8 @@ def test_window_aggs_for_rows(data_gen, batch_size):


# This is for aggregations that work with a running window optimization. They don't need to be batched
# specially, but it only works if all of the aggregations can support this. Right now this is just
# row number, but will expand to others in the future (rank and dense_rank).
@ignore_order
@approximate_float
# specially, but it only works if all of the aggregations can support this.
# the order returned should be consistent because the data ends up in a single task (no partitioning)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('b_gen', all_basic_gens_no_nans + [decimal_gen_scale_precision], ids=meta_idfn('data:'))
def test_window_running_no_part(b_gen, batch_size):
Expand All @@ -295,6 +296,9 @@ def test_window_running_no_part(b_gen, batch_size):
'count(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as count_col',
'min(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as min_col',
'max(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as max_col']
if isinstance(b_gen.data_type, NumericType) and not isinstance(b_gen, FloatGen) and not isinstance(b_gen, DoubleGen):
query_parts.append('sum(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as sum_col')

assert_gpu_and_cpu_are_equal_sql(
lambda spark : two_col_df(spark, LongRangeGen(), b_gen, length=1024 * 14),
"window_agg_table",
Expand All @@ -304,21 +308,55 @@ def test_window_running_no_part(b_gen, batch_size):
validate_execs_in_gpu_plan = ['GpuRunningWindowExec'],
conf = conf)

# Test that we can do a running window sum on floats and doubles. This becomes problematic because we do the agg in parallel
# which means that the result can switch back and forth from Inf to not Inf depending on the order of aggregations.
# We test this by limiting the range of the values in the sum to never hit Inf, and by using abs so we don't have
# positive and negative values that interfere with each other.
# the order returned should be consistent because the data ends up in a single task (no partitioning)
@approximate_float
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
def test_running_float_sum_no_part(batch_size):
conf = {'spark.rapids.sql.batchSizeBytes': batch_size,
'spark.rapids.sql.variableFloatAgg.enabled': True,
'spark.rapids.sql.castFloatToDecimal.enabled': True}
query_parts = ['a',
'sum(cast(b as double)) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as shrt_dbl_sum',
'sum(abs(dbl)) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dbl_sum',
'sum(cast(b as float)) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as shrt_flt_sum',
'sum(abs(flt)) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as flt_sum']

gen = StructGen([('a', LongRangeGen()),('b', short_gen),('flt', float_gen),('dbl', double_gen)], nullable=False)
assert_gpu_and_cpu_are_equal_sql(
lambda spark : gen_df(spark, gen, length=1024 * 14),
"window_agg_table",
'select ' +
', '.join(query_parts) +
' from window_agg_table ',
validate_execs_in_gpu_plan = ['GpuRunningWindowExec'],
conf = conf)

# This is for aggregations that work with a running window optimization. They don't need to be batched
# specially, but it only works if all of the aggregations can support this. Right now this is just
# row number, but will expand to others in the future (rank and dense_rank).
@ignore_order
# specially, but it only works if all of the aggregations can support this.
# In a distributed setup the order of the partitions returned might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('b_gen, c_gen', [(long_gen, x) for x in running_part_and_order_gens] +
[(x, long_gen) for x in all_basic_gens_no_nans + [decimal_gen_scale_precision]], ids=idfn)
def test_window_running(b_gen, c_gen, batch_size):
conf = {'spark.rapids.sql.batchSizeBytes': batch_size,
'spark.rapids.sql.hasNans': False,
'spark.rapids.sql.variableFloatAgg.enabled': True,
'spark.rapids.sql.castFloatToDecimal.enabled': True}
query_parts = ['row_number() over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as row_num',
'count(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as count_col',
'min(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as min_col',
'max(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as max_col']

# Decimal precision can grow too large. Float and Double can get odd results for Inf/-Inf because of ordering
if isinstance(c_gen.data_type, NumericType) and (not isinstance(c_gen, FloatGen)) and (not isinstance(c_gen, DoubleGen)) and (not isinstance(c_gen, DecimalGen)):
query_parts.append('sum(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as sum_col')

assert_gpu_and_cpu_are_equal_sql(
lambda spark : three_col_df(spark, LongRangeGen(), RepeatSeqGen(b_gen, length=100), c_gen, length=1024 * 14),
"window_agg_table",
Expand All @@ -328,8 +366,40 @@ def test_window_running(b_gen, c_gen, batch_size):
validate_execs_in_gpu_plan = ['GpuRunningWindowExec'],
conf = conf)

# Test that we can do a running window sum on floats and doubles and decimal. This becomes problematic because we do the agg in parallel
# which means that the result can switch back and forth from Inf to not Inf depending on the order of aggregations.
# We test this by limiting the range of the values in the sum to never hit Inf, and by using abs so we don't have
# positive and negative values that interfere with each other.
# decimal is problematic if the precision is so high it falls back to the CPU.
# In a distributed setup the order of the partitions returned might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
def test_window_running_float_decimal_sum(batch_size):
conf = {'spark.rapids.sql.batchSizeBytes': batch_size,
'spark.rapids.sql.variableFloatAgg.enabled': True,
'spark.rapids.sql.castFloatToDecimal.enabled': True}
# TODO need a way to insert NaNs...
query_parts = ['b', 'a',
'sum(cast(c as double)) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dbl_sum',
'sum(abs(dbl)) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dbl_sum',
'sum(cast(c as float)) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as flt_sum',
'sum(abs(flt)) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as flt_sum',
'sum(cast(c as Decimal(6,1))) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dec_sum']

gen = StructGen([('a', LongRangeGen()),('b', RepeatSeqGen(int_gen, length=1000)),('c', short_gen),('flt', float_gen),('dbl', double_gen)], nullable=False)
assert_gpu_and_cpu_are_equal_sql(
lambda spark : gen_df(spark, gen, length=1024 * 14),
"window_agg_table",
'select ' +
', '.join(query_parts) +
' from window_agg_table ',
validate_execs_in_gpu_plan = ['GpuRunningWindowExec'],
conf = conf)

@ignore_order
# In a distributed setup the order of the partitions returned might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
@approximate_float
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('c_gen', lead_lag_data_gens, ids=idfn)
Expand Down Expand Up @@ -454,7 +524,9 @@ def test_window_aggs_for_rows_lead_lag_on_arrays(a_gen, b_gen, c_gen, d_gen, bat

# lead and lag don't currently work for string columns, so redo the tests, but just for strings
# without lead and lag
@ignore_order
# In a distributed setup the order of the partitions returned might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
@approximate_float
@pytest.mark.parametrize('c_gen', [string_gen], ids=idfn)
@pytest.mark.parametrize('a_b_gen', part_and_order_gens, ids=meta_idfn('partAndOrderBy:'))
Expand Down Expand Up @@ -483,7 +555,9 @@ def do_it(spark):


# Test for RANGE queries, with timestamp order-by expressions.
@ignore_order
# In a distributed setup the order of the partitions returned might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [_grpkey_longs_with_timestamps,
pytest.param(_grpkey_longs_with_nullable_timestamps)],
ids=idfn)
Expand Down
Loading