Skip to content

Commit

Permalink
Read the complete batch before returning when selectedAttributes is e…
Browse files Browse the repository at this point in the history
…mpty (#2935)

* Read the cache even if the selected attributes is empty

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* read the entire batch

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* addressed feedback

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* Removed the catch exception clause and wrapped the Iterator in CloseableIterator

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* Added a task listener to CurrentBatchIterator

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* revert change to the test that limits the DataTypes

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

Co-authored-by: Raza Jafri <rjafri@nvidia.com>
  • Loading branch information
razajafri and razajafri authored Jul 21, 2021
1 parent 5f72553 commit 84a82f9
Show file tree
Hide file tree
Showing 3 changed files with 330 additions and 245 deletions.
44 changes: 44 additions & 0 deletions integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,47 @@ def helper(spark):

# NOTE: we aren't comparing cpu and gpu results, we are comparing the cached and non-cached results.
assert_equal(reg_result, cached_result)

def function_to_test_on_cached_df(with_x_session, func, data_gen, test_conf):
def with_cache(cached):
def helper(spark):
df = unary_op_df(spark, data_gen)
if cached:
df.cache().count()
return func(df)
return helper

reg_result = with_x_session(with_cache(False), test_conf)
cached_result = with_x_session(with_cache(True), test_conf)

assert_equal(reg_result, cached_result)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('with_x_session', [with_gpu_session, with_cpu_session])
@pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn)
@pytest.mark.parametrize('batch_size', [{"spark.rapids.sql.batchSizeBytes": "100"}, {}], ids=idfn)
@ignore_order
def test_cache_count(data_gen, with_x_session, enable_vectorized_conf, batch_size):
test_conf = enable_vectorized_conf.copy()
test_conf.update(allow_negative_scale_of_decimal_conf)
test_conf.update(batch_size)

function_to_test_on_cached_df(with_x_session, lambda df: df.count(), data_gen, test_conf)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('with_x_session', [with_cpu_session, with_gpu_session])
@pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn)
@pytest.mark.parametrize('batch_size', [{"spark.rapids.sql.batchSizeBytes": "100"}, {}], ids=idfn)
@ignore_order
# This tests the cached and uncached values returned by collect on the CPU and GPU.
# When running on the GPU with the DefaultCachedBatchSerializer, to project the results Spark adds a ColumnarToRowExec
# to be able to show the results which will cause this test to throw an exception as it's not on the GPU so we have to
# add that case to the `allowed` list. As of now there is no way for us to limit the scope of allow_non_gpu based on a
# condition therefore we must allow it in all cases
@allow_non_gpu('ColumnarToRowExec')
def test_cache_multi_batch(data_gen, with_x_session, enable_vectorized_conf, batch_size):
test_conf = enable_vectorized_conf.copy()
test_conf.update(allow_negative_scale_of_decimal_conf)
test_conf.update(batch_size)

function_to_test_on_cached_df(with_x_session, lambda df: df.collect(), data_gen, test_conf)
Loading

0 comments on commit 84a82f9

Please sign in to comment.