Skip to content

Commit

Permalink
Fix some memoization logic in groupby/sort/sort_helper.cu (#13521)
Browse files Browse the repository at this point in the history
Fixes some logic in the `groupby/sort/sort_helper.cu` to improve memoization safety. Some functions cache their results so they may be called several times. This change ensures the cached variable is fully formed before returning it so that parallel calls do not get a partially built result. The code also simplifies some of the logic by replacing some thrust calls with equivalent `cudf::detail` function calls that return columns.

Found while working on #13489

Authors:
  - David Wendt (https://github.com/davidwendt)

Approvers:
  - Yunsong Wang (https://github.com/PointKernel)
  - Nghia Truong (https://github.com/ttnghia)

URL: #13521
  • Loading branch information
davidwendt authored Jun 9, 2023
1 parent c49bfd1 commit 3e32dc3
Showing 1 changed file with 33 additions and 39 deletions.
72 changes: 33 additions & 39 deletions cpp/src/groupby/sort/sort_helper.cu
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/labeling/label_segments.cuh>
#include <cudf/detail/scatter.hpp>
#include <cudf/detail/sequence.hpp>
#include <cudf/detail/sorting.hpp>
#include <cudf/strings/string_view.hpp>
#include <cudf/table/experimental/row_operators.cuh>
Expand All @@ -37,10 +38,8 @@
#include <rmm/exec_policy.hpp>

#include <thrust/distance.h>
#include <thrust/fill.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/transform_iterator.h>
#include <thrust/sequence.h>
#include <thrust/unique.h>

#include <algorithm>
Expand Down Expand Up @@ -95,20 +94,12 @@ column_view sort_groupby_helper::key_sort_order(rmm::cuda_stream_view stream)

if (_key_sorted_order) { return sliced_key_sorted_order(); }

// TODO (dm): optimization. When keys are pre sorted but ignore nulls is true,
// we still want all rows with nulls in the end. Sort is costly, so
// do a copy_if(counting, sorted_order, {bitmask.is_valid(i)})
if (_keys_pre_sorted == sorted::YES) {
_key_sorted_order = make_numeric_column(
data_type(type_to_id<size_type>()), _keys.num_rows(), mask_state::UNALLOCATED, stream);

auto d_key_sorted_order = _key_sorted_order->mutable_view().data<size_type>();

thrust::sequence(rmm::exec_policy(stream),
d_key_sorted_order,
d_key_sorted_order + _key_sorted_order->size(),
0);

_key_sorted_order = cudf::detail::sequence(_keys.num_rows(),
numeric_scalar<size_type>(0),
numeric_scalar<size_type>(1),
stream,
rmm::mr::get_current_device_resource());
return sliced_key_sorted_order();
}

Expand Down Expand Up @@ -147,12 +138,14 @@ sort_groupby_helper::index_vector const& sort_groupby_helper::group_offsets(
if (_group_offsets) return *_group_offsets;

auto const size = num_keys(stream);
_group_offsets = std::make_unique<index_vector>(size + 1, stream);
// Create a temporary variable and only set _group_offsets right before the return.
// This way, a 2nd (parallel) call to this will not be given a partially created object.
auto group_offsets = std::make_unique<index_vector>(size + 1, stream);

auto const comparator = cudf::experimental::row::equality::self_comparator{_keys, stream};

auto const sorted_order = key_sort_order(stream).data<size_type>();
decltype(_group_offsets->begin()) result_end;
decltype(group_offsets->begin()) result_end;

if (cudf::detail::has_nested_columns(_keys)) {
auto const d_key_equal = comparator.equal_to<true>(
Expand All @@ -170,22 +163,23 @@ sort_groupby_helper::index_vector const& sort_groupby_helper::group_offsets(
itr,
itr + size,
result.begin(),
_group_offsets->begin(),
group_offsets->begin(),
thrust::identity<bool>{});
} else {
auto const d_key_equal = comparator.equal_to<false>(
cudf::nullate::DYNAMIC{cudf::has_nested_nulls(_keys)}, null_equality::EQUAL);
result_end = thrust::unique_copy(rmm::exec_policy(stream),
thrust::counting_iterator<size_type>(0),
thrust::counting_iterator<size_type>(size),
_group_offsets->begin(),
group_offsets->begin(),
permuted_row_equality_comparator(d_key_equal, sorted_order));
}

size_type num_groups = thrust::distance(_group_offsets->begin(), result_end);
_group_offsets->set_element(num_groups, size, stream);
_group_offsets->resize(num_groups + 1, stream);
auto const num_groups = thrust::distance(group_offsets->begin(), result_end);
group_offsets->set_element_async(num_groups, size, stream);
group_offsets->resize(num_groups + 1, stream);

_group_offsets = std::move(group_offsets);
return *_group_offsets;
}

Expand All @@ -194,18 +188,18 @@ sort_groupby_helper::index_vector const& sort_groupby_helper::group_labels(
{
if (_group_labels) return *_group_labels;

// Get group labels for future use in segmented sorting
_group_labels = std::make_unique<index_vector>(num_keys(stream), stream);
// Create a temporary variable and only set _group_labels right before the return.
// This way, a 2nd (parallel) call to this will not be given a partially created object.
auto group_labels = std::make_unique<index_vector>(num_keys(stream), stream);

auto& group_labels = *_group_labels;
if (num_keys(stream) == 0) return group_labels;
if (num_keys(stream)) {
auto const& offsets = group_offsets(stream);
cudf::detail::label_segments(
offsets.begin(), offsets.end(), group_labels->begin(), group_labels->end(), stream);
}

cudf::detail::label_segments(group_offsets(stream).begin(),
group_offsets(stream).end(),
group_labels.begin(),
group_labels.end(),
stream);
return group_labels;
_group_labels = std::move(group_labels);
return *_group_labels;
}

column_view sort_groupby_helper::unsorted_keys_labels(rmm::cuda_stream_view stream)
Expand Down Expand Up @@ -242,14 +236,14 @@ column_view sort_groupby_helper::keys_bitmask_column(rmm::cuda_stream_view strea
auto [row_bitmask, null_count] =
cudf::detail::bitmask_and(_keys, stream, rmm::mr::get_current_device_resource());

_keys_bitmask_column = make_numeric_column(
data_type(type_id::INT8), _keys.num_rows(), std::move(row_bitmask), null_count, stream);

auto keys_bitmask_view = _keys_bitmask_column->mutable_view();
using T = id_to_type<type_id::INT8>;
thrust::fill(
rmm::exec_policy(stream), keys_bitmask_view.begin<T>(), keys_bitmask_view.end<T>(), 0);
auto const zero = numeric_scalar<int8_t>(0);
// Create a temporary variable and only set _keys_bitmask_column right before the return.
// This way, a 2nd (parallel) call to this will not be given a partially created object.
auto keys_bitmask_column = cudf::detail::sequence(
_keys.num_rows(), zero, zero, stream, rmm::mr::get_current_device_resource());
keys_bitmask_column->set_null_mask(std::move(row_bitmask), null_count);

_keys_bitmask_column = std::move(keys_bitmask_column);
return _keys_bitmask_column->view();
}

Expand Down

0 comments on commit 3e32dc3

Please sign in to comment.