Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up and simplify gpuDecideCompression #13202

Merged
merged 13 commits into from
May 2, 2023
Merged
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 45 additions & 43 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1315,56 +1315,56 @@ __global__ void __launch_bounds__(128, 8)
}
}

// blockDim(128, 1, 1)
__global__ void __launch_bounds__(128) gpuDecideCompression(device_span<EncColumnChunk> chunks)
constexpr int decide_compression_warps_in_block = 4;
constexpr int decide_compression_block_size =
decide_compression_warps_in_block * cudf::detail::warp_size;

// blockDim(decide_compression_block_size, 1, 1)
__global__ void __launch_bounds__(decide_compression_block_size)
gpuDecideCompression(device_span<EncColumnChunk> chunks)
{
// After changing the way structs are loaded from coop to normal, this kernel has no business
// being launched with 128 thread block. It can easily be a single warp.
__shared__ __align__(8) EncColumnChunk ck_g;
__shared__ __align__(4) unsigned int error_count;
__shared__ __align__(8) EncColumnChunk ck_g[decide_compression_warps_in_block];
__shared__ __align__(4) unsigned int compression_error[decide_compression_warps_in_block];
Comment on lines +1326 to +1327
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we align them manually? And why do we need to align them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It allows more efficient access, at least in theory. I'm not the one who added the alignment, and I also haven't tested how this alignment impacts performance in practice.

using warp_reduce = cub::WarpReduce<uint32_t>;
__shared__ typename warp_reduce::TempStorage temp_storage[2];
__shared__ volatile bool has_compression;
__shared__ typename warp_reduce::TempStorage temp_storage[decide_compression_warps_in_block][2];

uint32_t t = threadIdx.x;
uint32_t uncompressed_data_size = 0;
uint32_t compressed_data_size = 0;
uint32_t num_pages;
auto const lane_id = threadIdx.x % cudf::detail::warp_size;
auto const warp_id = threadIdx.x / cudf::detail::warp_size;
Comment on lines +1331 to +1332
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question for the reviewers: Are there maybe helper functions for this? Looks very generic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that I am aware of.

auto const chunk_id = blockIdx.x * decide_compression_warps_in_block + warp_id;

if (t == 0) {
ck_g = chunks[blockIdx.x];
atomicAnd(&error_count, 0);
has_compression = false;
if (chunk_id >= chunks.size()) { return; }

if (lane_id == 0) {
ck_g[warp_id] = chunks[chunk_id];
compression_error[warp_id] = 0;
}
__syncthreads();
if (t < 32) {
num_pages = ck_g.num_pages;
for (uint32_t page = t; page < num_pages; page += 32) {
auto& curr_page = ck_g.pages[page];
uint32_t page_data_size = curr_page.max_data_size;
uncompressed_data_size += page_data_size;
if (auto comp_res = curr_page.comp_res; comp_res != nullptr) {
has_compression = true;
compressed_data_size += comp_res->bytes_written;
if (comp_res->status != compression_status::SUCCESS) { atomicAdd(&error_count, 1); }
__syncwarp();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we write this kernel in a block-size agnostic way? Unlike __syncthreads();, using __syncwarp(); assumes that block_size == warp_size == 32.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That depends on how we would scale the parallelism with multiple warps. If any warps worked on a single chunks element, then, yes, we would need to syn all threads in the block. But, with multiple warps, IMO this kernel should actually have each warp would work on a separate chunks element. In this case we don't need to synchronize different warps and __syncwarp is still the right option.
I understand that my change left this ambiguous as warp size is used interchangeably for block and warp size. I'll try to make this clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified the kernel to work with any number of warps in a block. The size can be adjusted via constexpr decide_compression_warps_in_block. Used warp_size as well, so we should be magic number-free now :)


uint32_t uncompressed_data_size = 0;
uint32_t compressed_data_size = 0;
auto const num_pages = ck_g[warp_id].num_pages;
for (auto page = lane_id; page < num_pages; page += cudf::detail::warp_size) {
vuule marked this conversation as resolved.
Show resolved Hide resolved
auto& curr_page = ck_g[warp_id].pages[page];
vuule marked this conversation as resolved.
Show resolved Hide resolved
auto const page_data_size = curr_page.max_data_size;
uncompressed_data_size += page_data_size;
if (auto comp_res = curr_page.comp_res; comp_res != nullptr) {
compressed_data_size += comp_res->bytes_written;
if (comp_res->status != compression_status::SUCCESS) {
atomicOr(&compression_error[warp_id], 1);
}
}
uncompressed_data_size = warp_reduce(temp_storage[0]).Sum(uncompressed_data_size);
compressed_data_size = warp_reduce(temp_storage[1]).Sum(compressed_data_size);
}
__syncthreads();
if (t == 0) {
bool is_compressed;
if (has_compression) {
uint32_t compression_error = atomicAdd(&error_count, 0);
is_compressed = (!compression_error && compressed_data_size < uncompressed_data_size);
} else {
is_compressed = false;
}
chunks[blockIdx.x].is_compressed = is_compressed;
chunks[blockIdx.x].bfr_size = uncompressed_data_size;
chunks[blockIdx.x].compressed_size =
(is_compressed) ? compressed_data_size : uncompressed_data_size;
uncompressed_data_size = warp_reduce(temp_storage[warp_id][0]).Sum(uncompressed_data_size);
compressed_data_size = warp_reduce(temp_storage[warp_id][1]).Sum(compressed_data_size);
__syncwarp();

if (lane_id == 0) {
auto const write_compressed = compressed_data_size != 0 and compression_error[warp_id] == 0 and
compressed_data_size < uncompressed_data_size;
chunks[chunk_id].is_compressed = write_compressed;
chunks[chunk_id].bfr_size = uncompressed_data_size;
chunks[chunk_id].compressed_size =
write_compressed ? compressed_data_size : uncompressed_data_size;
}
}

Expand Down Expand Up @@ -2167,7 +2167,9 @@ void EncodePages(device_span<gpu::EncPage> pages,

void DecideCompression(device_span<EncColumnChunk> chunks, rmm::cuda_stream_view stream)
{
gpuDecideCompression<<<chunks.size(), 128, 0, stream.value()>>>(chunks);
auto const num_blocks =
util::div_rounding_up_safe<int>(chunks.size(), decide_compression_warps_in_block);
gpuDecideCompression<<<num_blocks, decide_compression_block_size, 0, stream.value()>>>(chunks);
}

void EncodePageHeaders(device_span<EncPage> pages,
Expand Down