Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some memoization logic in groupby/sort/sort_helper.cu #13521

Merged
72 changes: 33 additions & 39 deletions cpp/src/groupby/sort/sort_helper.cu
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/labeling/label_segments.cuh>
#include <cudf/detail/scatter.hpp>
#include <cudf/detail/sequence.hpp>
#include <cudf/detail/sorting.hpp>
#include <cudf/strings/string_view.hpp>
#include <cudf/table/experimental/row_operators.cuh>
Expand All @@ -37,10 +38,8 @@
#include <rmm/exec_policy.hpp>

#include <thrust/distance.h>
#include <thrust/fill.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/transform_iterator.h>
#include <thrust/sequence.h>
#include <thrust/unique.h>

#include <algorithm>
Expand Down Expand Up @@ -95,20 +94,12 @@ column_view sort_groupby_helper::key_sort_order(rmm::cuda_stream_view stream)

if (_key_sorted_order) { return sliced_key_sorted_order(); }

// TODO (dm): optimization. When keys are pre sorted but ignore nulls is true,
// we still want all rows with nulls in the end. Sort is costly, so
// do a copy_if(counting, sorted_order, {bitmask.is_valid(i)})
if (_keys_pre_sorted == sorted::YES) {
_key_sorted_order = make_numeric_column(
data_type(type_to_id<size_type>()), _keys.num_rows(), mask_state::UNALLOCATED, stream);

auto d_key_sorted_order = _key_sorted_order->mutable_view().data<size_type>();

thrust::sequence(rmm::exec_policy(stream),
d_key_sorted_order,
d_key_sorted_order + _key_sorted_order->size(),
0);

_key_sorted_order = cudf::detail::sequence(_keys.num_rows(),
numeric_scalar<size_type>(0),
numeric_scalar<size_type>(1),
stream,
rmm::mr::get_current_device_resource());
return sliced_key_sorted_order();
}

Expand Down Expand Up @@ -147,12 +138,14 @@ sort_groupby_helper::index_vector const& sort_groupby_helper::group_offsets(
if (_group_offsets) return *_group_offsets;

auto const size = num_keys(stream);
_group_offsets = std::make_unique<index_vector>(size + 1, stream);
// Create a temporary variable and only set _group_offsets right before the return.
// This way, a 2nd (parallel) call to this will not be given a partially created object.
auto group_offsets = std::make_unique<index_vector>(size + 1, stream);

auto const comparator = cudf::experimental::row::equality::self_comparator{_keys, stream};

auto const sorted_order = key_sort_order(stream).data<size_type>();
decltype(_group_offsets->begin()) result_end;
decltype(group_offsets->begin()) result_end;

if (cudf::detail::has_nested_columns(_keys)) {
auto const d_key_equal = comparator.equal_to<true>(
Expand All @@ -170,22 +163,23 @@ sort_groupby_helper::index_vector const& sort_groupby_helper::group_offsets(
itr,
itr + size,
result.begin(),
_group_offsets->begin(),
group_offsets->begin(),
thrust::identity<bool>{});
} else {
auto const d_key_equal = comparator.equal_to<false>(
cudf::nullate::DYNAMIC{cudf::has_nested_nulls(_keys)}, null_equality::EQUAL);
result_end = thrust::unique_copy(rmm::exec_policy(stream),
thrust::counting_iterator<size_type>(0),
thrust::counting_iterator<size_type>(size),
_group_offsets->begin(),
group_offsets->begin(),
permuted_row_equality_comparator(d_key_equal, sorted_order));
}

size_type num_groups = thrust::distance(_group_offsets->begin(), result_end);
_group_offsets->set_element(num_groups, size, stream);
_group_offsets->resize(num_groups + 1, stream);
auto const num_groups = thrust::distance(group_offsets->begin(), result_end);
group_offsets->set_element_async(num_groups, size, stream);
group_offsets->resize(num_groups + 1, stream);

_group_offsets = std::move(group_offsets);
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
return *_group_offsets;
}

Expand All @@ -194,18 +188,18 @@ sort_groupby_helper::index_vector const& sort_groupby_helper::group_labels(
{
if (_group_labels) return *_group_labels;

// Get group labels for future use in segmented sorting
_group_labels = std::make_unique<index_vector>(num_keys(stream), stream);
// Create a temporary variable and only set _group_labels right before the return.
// This way, a 2nd (parallel) call to this will not be given a partially created object.
auto group_labels = std::make_unique<index_vector>(num_keys(stream), stream);
ttnghia marked this conversation as resolved.
Show resolved Hide resolved

auto& group_labels = *_group_labels;
if (num_keys(stream) == 0) return group_labels;
if (num_keys(stream)) {
auto const& offsets = group_offsets(stream);
cudf::detail::label_segments(
offsets.begin(), offsets.end(), group_labels->begin(), group_labels->end(), stream);
}

cudf::detail::label_segments(group_offsets(stream).begin(),
group_offsets(stream).end(),
group_labels.begin(),
group_labels.end(),
stream);
return group_labels;
_group_labels = std::move(group_labels);
return *_group_labels;
}

column_view sort_groupby_helper::unsorted_keys_labels(rmm::cuda_stream_view stream)
Expand Down Expand Up @@ -242,14 +236,14 @@ column_view sort_groupby_helper::keys_bitmask_column(rmm::cuda_stream_view strea
auto [row_bitmask, null_count] =
cudf::detail::bitmask_and(_keys, stream, rmm::mr::get_current_device_resource());

_keys_bitmask_column = make_numeric_column(
data_type(type_id::INT8), _keys.num_rows(), std::move(row_bitmask), null_count, stream);

auto keys_bitmask_view = _keys_bitmask_column->mutable_view();
using T = id_to_type<type_id::INT8>;
thrust::fill(
rmm::exec_policy(stream), keys_bitmask_view.begin<T>(), keys_bitmask_view.end<T>(), 0);
auto const zero = numeric_scalar<int8_t>(0);
// Create a temporary variable and only set _keys_bitmask_column right before the return.
// This way, a 2nd (parallel) call to this will not be given a partially created object.
auto keys_bitmask_column = cudf::detail::sequence(
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
_keys.num_rows(), zero, zero, stream, rmm::mr::get_current_device_resource());
keys_bitmask_column->set_null_mask(std::move(row_bitmask), null_count);

_keys_bitmask_column = std::move(keys_bitmask_column);
return _keys_bitmask_column->view();
}

Expand Down