Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43953: [C++] Add tests based on random data and benchmarks to ChunkResolver::ResolveMany #43954

Merged
merged 15 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,7 @@ add_arrow_test(sparse_tensor_test)
add_arrow_test(stl_test SOURCES stl_iterator_test.cc stl_test.cc)

add_arrow_benchmark(builder_benchmark)
add_arrow_benchmark(chunk_resolver_benchmark)
add_arrow_benchmark(compare_benchmark)
add_arrow_benchmark(memory_pool_benchmark)
add_arrow_benchmark(type_benchmark)
Expand Down
104 changes: 59 additions & 45 deletions cpp/src/arrow/chunk_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,43 +55,57 @@ inline std::vector<int64_t> MakeChunksOffsets(const std::vector<T>& chunks) {
return offsets;
}

template <typename IndexType>
inline TypedChunkLocation<IndexType> ResolveOneInline(uint32_t num_offsets,
const uint64_t* offsets,
IndexType typed_logical_index,
int32_t num_chunks,
int32_t chunk_hint) {
const auto index = static_cast<uint64_t>(typed_logical_index);
// use or update chunk_hint
if (index >= offsets[chunk_hint] &&
(chunk_hint == num_chunks || index < offsets[chunk_hint + 1])) {
// hint is correct!
} else {
// lo < hi is guaranteed by `num_offsets = chunks.size() + 1`
auto chunk_index =
ChunkResolver::Bisect(index, offsets, /*lo=*/0, /*hi=*/num_offsets);
chunk_hint = static_cast<int32_t>(chunk_index);
}
// chunk_index is in [0, chunks.size()] no matter what the value
// of logical_index is, so it's always safe to dereference offsets
// as it contains chunks.size()+1 values.
auto loc = TypedChunkLocation<IndexType>(
/*chunk_index=*/chunk_hint,
/*index_in_chunk=*/typed_logical_index -
static_cast<IndexType>(offsets[chunk_hint]));
#if defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER)
// Make it more likely that Valgrind/ASAN can catch an invalid memory
// access by poisoning the index-in-chunk value when the logical
// index is out-of-bounds.
if (static_cast<int32_t>(loc.chunk_index) == num_chunks) {
loc.index_in_chunk = std::numeric_limits<IndexType>::max();
}
#endif
return loc;
}

/// \pre all the pre-conditions of ChunkResolver::ResolveMany()
/// \pre num_offsets - 1 <= std::numeric_limits<IndexType>::max()
template <typename IndexType>
void ResolveManyInline(size_t num_offsets, const int64_t* signed_offsets,
void ResolveManyInline(uint32_t num_offsets, const int64_t* signed_offsets,
int64_t n_indices, const IndexType* logical_index_vec,
TypedChunkLocation<IndexType>* out_chunk_location_vec,
IndexType chunk_hint) {
int32_t chunk_hint) {
auto* offsets = reinterpret_cast<const uint64_t*>(signed_offsets);
const auto num_chunks = static_cast<IndexType>(num_offsets - 1);
const auto num_chunks = static_cast<int32_t>(num_offsets - 1);
// chunk_hint in [0, num_offsets) per the precondition.
for (int64_t i = 0; i < n_indices; i++) {
auto typed_logical_index = logical_index_vec[i];
const auto index = static_cast<uint64_t>(typed_logical_index);
// use or update chunk_hint
if (index >= offsets[chunk_hint] &&
(chunk_hint == num_chunks || index < offsets[chunk_hint + 1])) {
// hint is correct!
} else {
// lo < hi is guaranteed by `num_offsets = chunks.size() + 1`
auto chunk_index =
ChunkResolver::Bisect(index, offsets, /*lo=*/0, /*hi=*/num_offsets);
chunk_hint = static_cast<IndexType>(chunk_index);
}
out_chunk_location_vec[i].chunk_index = chunk_hint;
// chunk_index is in [0, chunks.size()] no matter what the
// value of logical_index is, so it's always safe to dereference
// offset_ as it contains chunks.size()+1 values.
out_chunk_location_vec[i].index_in_chunk =
typed_logical_index - static_cast<IndexType>(offsets[chunk_hint]);
#if defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER)
// Make it more likely that Valgrind/ASAN can catch an invalid memory
// access by poisoning the index-in-chunk value when the logical
// index is out-of-bounds.
if (chunk_hint == num_chunks) {
out_chunk_location_vec[i].index_in_chunk = std::numeric_limits<IndexType>::max();
}
#endif
const auto typed_logical_index = logical_index_vec[i];
const auto loc = ResolveOneInline(num_offsets, offsets, typed_logical_index,
num_chunks, chunk_hint);
out_chunk_location_vec[i] = loc;
chunk_hint = static_cast<int32_t>(loc.chunk_index);
}
}

Expand Down Expand Up @@ -127,30 +141,30 @@ ChunkResolver& ChunkResolver::operator=(const ChunkResolver& other) noexcept {

void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint8_t* logical_index_vec,
TypedChunkLocation<uint8_t>* out_chunk_location_vec,
uint8_t chunk_hint) const {
ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec,
out_chunk_location_vec, chunk_hint);
}

void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint32_t* logical_index_vec,
TypedChunkLocation<uint32_t>* out_chunk_location_vec,
uint32_t chunk_hint) const {
ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec,
out_chunk_location_vec, chunk_hint);
int32_t chunk_hint) const {
ResolveManyInline(static_cast<uint32_t>(offsets_.size()), offsets_.data(), n_indices,
logical_index_vec, out_chunk_location_vec, chunk_hint);
}

void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint16_t* logical_index_vec,
TypedChunkLocation<uint16_t>* out_chunk_location_vec,
uint16_t chunk_hint) const {
ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec,
out_chunk_location_vec, chunk_hint);
int32_t chunk_hint) const {
ResolveManyInline(static_cast<uint32_t>(offsets_.size()), offsets_.data(), n_indices,
logical_index_vec, out_chunk_location_vec, chunk_hint);
}

void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint32_t* logical_index_vec,
TypedChunkLocation<uint32_t>* out_chunk_location_vec,
int32_t chunk_hint) const {
ResolveManyInline(static_cast<uint32_t>(offsets_.size()), offsets_.data(), n_indices,
logical_index_vec, out_chunk_location_vec, chunk_hint);
}

void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint64_t* logical_index_vec,
TypedChunkLocation<uint64_t>* out_chunk_location_vec,
uint64_t chunk_hint) const {
ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec,
out_chunk_location_vec, chunk_hint);
int32_t chunk_hint) const {
ResolveManyInline(static_cast<uint32_t>(offsets_.size()), offsets_.data(), n_indices,
logical_index_vec, out_chunk_location_vec, chunk_hint);
}

} // namespace arrow::internal
57 changes: 30 additions & 27 deletions cpp/src/arrow/chunk_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ struct ARROW_EXPORT ChunkResolver {
/// \brief Cache of the index of the last resolved chunk.
///
/// \invariant `cached_chunk_ in [0, chunks.size()]`
mutable std::atomic<int64_t> cached_chunk_;
mutable std::atomic<int32_t> cached_chunk_;

public:
explicit ChunkResolver(const ArrayVector& chunks) noexcept;
Expand All @@ -92,6 +92,8 @@ struct ARROW_EXPORT ChunkResolver {
for (size_t i = 1; i < offsets_.size(); i++) {
assert(offsets_[i] >= offsets_[i - 1]);
}
assert(offsets_.size() - 1 <=
static_cast<size_t>(std::numeric_limits<int32_t>::max()));
#endif
}

Expand All @@ -102,7 +104,7 @@ struct ARROW_EXPORT ChunkResolver {
ChunkResolver& operator=(const ChunkResolver& other) noexcept;

int64_t logical_array_length() const { return offsets_.back(); }
int64_t num_chunks() const { return static_cast<int64_t>(offsets_.size()) - 1; }
int32_t num_chunks() const { return static_cast<int32_t>(offsets_.size() - 1); }

int64_t chunk_length(int64_t chunk_index) const {
return offsets_[chunk_index + 1] - offsets_[chunk_index];
Expand Down Expand Up @@ -140,9 +142,9 @@ struct ARROW_EXPORT ChunkResolver {
/// bounds, or with chunk_index == chunks.size() if logical index is
/// `>= chunked_array.length()`.
inline ChunkLocation ResolveWithHint(int64_t index, ChunkLocation hint) const {
assert(hint.chunk_index < static_cast<int64_t>(offsets_.size()));
const auto chunk_index =
ResolveChunkIndex</*StoreCachedChunk=*/false>(index, hint.chunk_index);
assert(hint.chunk_index < static_cast<uint32_t>(offsets_.size()));
const auto chunk_index = ResolveChunkIndex</*StoreCachedChunk=*/false>(
index, static_cast<int32_t>(hint.chunk_index));
return ChunkLocation{chunk_index, index - offsets_[chunk_index]};
}

Expand All @@ -169,13 +171,12 @@ struct ARROW_EXPORT ChunkResolver {
[[nodiscard]] bool ResolveMany(int64_t n_indices, const IndexType* logical_index_vec,
TypedChunkLocation<IndexType>* out_chunk_location_vec,
IndexType chunk_hint = 0) const {
if constexpr (sizeof(IndexType) < sizeof(uint64_t)) {
if constexpr (sizeof(IndexType) < sizeof(uint32_t)) {
// The max value returned by Bisect is `offsets.size() - 1` (= chunks.size()).
constexpr uint64_t kMaxIndexTypeValue = std::numeric_limits<IndexType>::max();
constexpr int64_t kMaxIndexTypeValue = std::numeric_limits<IndexType>::max();
// A ChunkedArray with enough empty chunks can make the index of a chunk
// exceed the logical index and thus the maximum value of IndexType.
const bool chunk_index_fits_on_type =
static_cast<uint64_t>(offsets_.size() - 1) <= kMaxIndexTypeValue;
const bool chunk_index_fits_on_type = num_chunks() <= kMaxIndexTypeValue;
if (ARROW_PREDICT_FALSE(!chunk_index_fits_on_type)) {
return false;
}
Expand All @@ -194,34 +195,36 @@ struct ARROW_EXPORT ChunkResolver {
using U = std::make_unsigned_t<IndexType>;
ResolveManyImpl(n_indices, reinterpret_cast<const U*>(logical_index_vec),
reinterpret_cast<TypedChunkLocation<U>*>(out_chunk_location_vec),
static_cast<U>(chunk_hint));
static_cast<int32_t>(chunk_hint));
} else {
static_assert(std::is_unsigned_v<IndexType>);
ResolveManyImpl(n_indices, logical_index_vec, out_chunk_location_vec, chunk_hint);
ResolveManyImpl(n_indices, logical_index_vec, out_chunk_location_vec,
static_cast<int32_t>(chunk_hint));
}
return true;
}

private:
template <bool StoreCachedChunk>
inline int64_t ResolveChunkIndex(int64_t index, int64_t cached_chunk) const {
inline int64_t ResolveChunkIndex(int64_t index, int32_t cached_chunk) const {
// It is common for algorithms sequentially processing arrays to make consecutive
// accesses at a relatively small distance from each other, hence often falling in the
// same chunk.
//
// This is guaranteed when merging (assuming each side of the merge uses its
// own resolver), and is the most common case in recursive invocations of
// partitioning.
const auto num_offsets = static_cast<int64_t>(offsets_.size());
const auto num_offsets = static_cast<uint32_t>(offsets_.size());
const int64_t* offsets = offsets_.data();
if (ARROW_PREDICT_TRUE(index >= offsets[cached_chunk]) &&
(cached_chunk + 1 == num_offsets || index < offsets[cached_chunk + 1])) {
(static_cast<uint32_t>(cached_chunk + 1) == num_offsets ||
index < offsets[cached_chunk + 1])) {
return cached_chunk;
}
// lo < hi is guaranteed by `num_offsets = chunks.size() + 1`
const auto chunk_index = Bisect(index, offsets, /*lo=*/0, /*hi=*/num_offsets);
if constexpr (StoreCachedChunk) {
assert(chunk_index < static_cast<int64_t>(offsets_.size()));
assert(static_cast<uint32_t>(chunk_index) < static_cast<uint32_t>(offsets_.size()));
cached_chunk_.store(chunk_index, std::memory_order_relaxed);
}
return chunk_index;
Expand All @@ -230,13 +233,13 @@ struct ARROW_EXPORT ChunkResolver {
/// \pre all the pre-conditions of ChunkResolver::ResolveMany()
/// \pre num_offsets - 1 <= std::numeric_limits<IndexType>::max()
void ResolveManyImpl(int64_t, const uint8_t*, TypedChunkLocation<uint8_t>*,
uint8_t) const;
int32_t) const;
void ResolveManyImpl(int64_t, const uint16_t*, TypedChunkLocation<uint16_t>*,
uint16_t) const;
int32_t) const;
void ResolveManyImpl(int64_t, const uint32_t*, TypedChunkLocation<uint32_t>*,
uint32_t) const;
int32_t) const;
void ResolveManyImpl(int64_t, const uint64_t*, TypedChunkLocation<uint64_t>*,
uint64_t) const;
int32_t) const;

public:
/// \brief Find the index of the chunk that contains the logical index.
Expand All @@ -249,24 +252,24 @@ struct ARROW_EXPORT ChunkResolver {
/// \pre index >= 0 (otherwise, when index is negative, hi-1 is returned)
/// \pre lo < hi
/// \pre lo >= 0 && hi <= offsets_.size()
static inline int64_t Bisect(int64_t index, const int64_t* offsets, int64_t lo,
int64_t hi) {
static inline int32_t Bisect(int64_t index, const int64_t* offsets, int32_t lo,
int32_t hi) {
return Bisect(static_cast<uint64_t>(index),
reinterpret_cast<const uint64_t*>(offsets), static_cast<uint64_t>(lo),
static_cast<uint64_t>(hi));
reinterpret_cast<const uint64_t*>(offsets), static_cast<uint32_t>(lo),
static_cast<uint32_t>(hi));
}

static inline int64_t Bisect(uint64_t index, const uint64_t* offsets, uint64_t lo,
uint64_t hi) {
static inline int32_t Bisect(uint64_t index, const uint64_t* offsets, uint32_t lo,
uint32_t hi) {
// Similar to std::upper_bound(), but slightly different as our offsets
// array always starts with 0.
auto n = hi - lo;
// First iteration does not need to check for n > 1
// (lo < hi is guaranteed by the precondition).
assert(n > 1 && "lo < hi is a precondition of Bisect");
do {
const uint64_t m = n >> 1;
const uint64_t mid = lo + m;
const uint32_t m = n >> 1;
const uint32_t mid = lo + m;
if (index >= offsets[mid]) {
lo = mid;
n -= m;
Expand Down
Loading
Loading