Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multi-source reading in JSON byte range reader #15671

Merged
merged 11 commits into from
May 10, 2024
54 changes: 27 additions & 27 deletions cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ size_t sources_size(host_span<std::unique_ptr<datasource>> const sources,
}

/**
* @brief Read from array of data sources into RMM buffer
* @brief Read from array of data sources into RMM buffer. The size of the returned device span
can be larger than the number of bytes requested from the list of sources when
the range to be read spans across multiple sources. This is due to the delimiter
characters inserted after the end of each accessed source.
*
* @param buffer Device span buffer to which data is read
* @param sources Array of data sources
Expand All @@ -72,7 +75,6 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
// line of file i+1 don't end up on the same JSON line, if file i does not already end with a line
// delimiter.
auto constexpr num_delimiter_chars = 1;
shrshi marked this conversation as resolved.
Show resolved Hide resolved
auto const num_extra_delimiters = num_delimiter_chars * (sources.size() - 1);

if (compression == compression_type::NONE) {
std::vector<size_type> delimiter_map{};
Expand All @@ -89,36 +91,36 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset);
size_t start_source = std::distance(prefsum_source_sizes.begin(), upper);

auto remaining_bytes_to_read = std::min(range_size, prefsum_source_sizes.back() - range_offset);
auto total_bytes_to_read = std::min(range_size, prefsum_source_sizes.back() - range_offset);
shrshi marked this conversation as resolved.
Show resolved Hide resolved
range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0;
for (size_t i = start_source; i < sources.size() && remaining_bytes_to_read; i++) {
for (size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; i++) {
if (sources[i]->is_empty()) continue;
auto data_size = std::min(sources[i]->size() - range_offset, remaining_bytes_to_read);
auto destination = reinterpret_cast<uint8_t*>(buffer.data()) + bytes_read;
auto data_size =
std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read);
auto destination = reinterpret_cast<uint8_t*>(buffer.data()) + bytes_read +
(num_delimiter_chars * delimiter_map.size());
if (sources[i]->is_device_read_preferred(data_size)) {
bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream);
} else {
h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size));
auto const& h_buffer = h_buffers.back();
CUDF_CUDA_TRY(cudaMemcpyAsync(
destination, h_buffer->data(), h_buffer->size(), cudaMemcpyDefault, stream.value()));
destination, h_buffer->data(), h_buffer->size(), cudaMemcpyHostToDevice, stream.value()));
bytes_read += h_buffer->size();
}
range_offset = 0;
remaining_bytes_to_read -= bytes_read;
delimiter_map.push_back(bytes_read);
bytes_read += num_delimiter_chars;
delimiter_map.push_back(bytes_read + (num_delimiter_chars * delimiter_map.size()));
}
// In the case where all sources are empty, bytes_read is zero
if (bytes_read) bytes_read -= num_delimiter_chars;
// Removing delimiter inserted after last non-empty source is read
if (!delimiter_map.empty()) delimiter_map.pop_back();
shrshi marked this conversation as resolved.
Show resolved Hide resolved

// If this is a multi-file source, we scatter the JSON line delimiters between files
if (sources.size() > 1) {
static_assert(num_delimiter_chars == 1,
"Currently only single-character delimiters are supported");
auto const delimiter_source = thrust::make_constant_iterator('\n');
auto const d_delimiter_map = cudf::detail::make_device_uvector_async(
host_span<size_type const>{delimiter_map.data(), delimiter_map.size() - 1},
host_span<size_type const>{delimiter_map.data(), delimiter_map.size()},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed a vector is implicitly converted into host_span.

Suggested change
host_span<size_type const>{delimiter_map.data(), delimiter_map.size()},
delimiter_map,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't work well with templated calls.
Maybe the explicit construction here can be avoiding by specifying the type with make_device_uvector_async<size_type const>. The combination of implicit conversion and templated functions is pretty annoying.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I thought as well, but the implicit conversion works here 😲

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, because it's calling a different overload of make_device_uvector_async:

rmm::device_uvector<typename Container::value_type> make_device_uvector_async(
  Container const& c, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr)

stream,
rmm::mr::get_current_device_resource());
thrust::scatter(rmm::exec_policy_nosync(stream),
Expand All @@ -128,7 +130,7 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
buffer.data());
}
stream.synchronize();
return buffer.first(bytes_read);
return buffer.first(bytes_read + (delimiter_map.size() * num_delimiter_chars));
}
// TODO: allow byte range reading from multiple compressed files.
auto remaining_bytes_to_read = std::min(range_size, sources[0]->size() - range_offset);
Expand All @@ -151,17 +153,15 @@ size_type find_first_delimiter_in_chunk(host_span<std::unique_ptr<cudf::io::data
char const delimiter,
rmm::cuda_stream_view stream)
{
auto const total_source_size =
sources_size(sources, reader_opts.get_byte_range_offset(), reader_opts.get_byte_range_size()) +
(sources.size() - 1);
auto total_source_size = sources_size(sources, 0, 0) + (sources.size() - 1);
rmm::device_uvector<char> buffer(total_source_size, stream);
ingest_raw_input(buffer,
sources,
reader_opts.get_compression(),
reader_opts.get_byte_range_offset(),
reader_opts.get_byte_range_size(),
stream);
return find_first_delimiter(buffer, delimiter, stream);
auto readbufspan = ingest_raw_input(buffer,
sources,
reader_opts.get_compression(),
reader_opts.get_byte_range_offset(),
reader_opts.get_byte_range_size(),
stream);
return find_first_delimiter(readbufspan, '\n', stream);
}

/**
Expand Down Expand Up @@ -195,8 +195,7 @@ datasource::owning_buffer<rmm::device_uvector<char>> get_record_range_raw_input(
CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset,
"Invalid offsetting");
auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset;
chunk_size =
should_load_all_sources ? total_source_size - chunk_offset + num_extra_delimiters : chunk_size;
chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size;

// Some magic numbers
constexpr int num_subchunks = 10; // per chunk_size
Expand All @@ -217,7 +216,8 @@ datasource::owning_buffer<rmm::device_uvector<char>> get_record_range_raw_input(
size_t const buffer_size =
reader_compression != compression_type::NONE
? total_source_size * estimated_compression_ratio + header_size
: std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk);
: std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) +
num_extra_delimiters;
rmm::device_uvector<char> buffer(buffer_size, stream);
device_span<char> bufspan(buffer);

Expand Down
110 changes: 107 additions & 3 deletions cpp/tests/io/json_chunked_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,19 @@

#include <rmm/resource_ref.hpp>

#include <fstream>
#include <string>
#include <vector>

/**
* @brief Base test fixture for JSON reader tests
*/
struct JsonReaderTest : public cudf::test::BaseFixture {};

cudf::test::TempDirTestEnvironment* const temp_env =
static_cast<cudf::test::TempDirTestEnvironment*>(
::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment));

// function to extract first delimiter in the string in each chunk,
// collate together and form byte_range for each chunk,
// parse separately.
Expand All @@ -41,7 +49,6 @@ std::vector<cudf::io::table_with_metadata> skeleton_for_parellel_chunk_reader(
{
using namespace cudf::io::json::detail;
using cudf::size_type;
// assuming single source.
size_t total_source_size = 0;
for (auto const& source : sources) {
total_source_size += source->size();
Expand Down Expand Up @@ -77,7 +84,9 @@ std::vector<cudf::io::table_with_metadata> skeleton_for_parellel_chunk_reader(
std::vector<cudf::io::table_with_metadata> tables;
// Process each chunk in parallel.
for (auto const& [chunk_start, chunk_end] : record_ranges) {
if (chunk_start == -1 or chunk_end == -1) continue;
if (chunk_start == -1 or chunk_end == -1 or
static_cast<size_t>(chunk_start) >= total_source_size)
continue;
reader_opts_chunk.set_byte_range_offset(chunk_start);
reader_opts_chunk.set_byte_range_size(chunk_end - chunk_start);
tables.push_back(read_json(sources, reader_opts_chunk, stream, mr));
Expand All @@ -87,7 +96,7 @@ std::vector<cudf::io::table_with_metadata> skeleton_for_parellel_chunk_reader(
return tables;
}

TEST_F(JsonReaderTest, ByteRange)
TEST_F(JsonReaderTest, ByteRange_SingleSource)
{
std::string const json_string = R"(
{ "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 }
Expand Down Expand Up @@ -126,3 +135,98 @@ TEST_F(JsonReaderTest, ByteRange)
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(current_reader_table.tbl->view(), result->view());
}
}

TEST_F(JsonReaderTest, ReadCompleteFiles)
{
std::string const json_string = R"(
{ "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 }
{ "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 }
{ "a": { "y" : 6}, "b" : [6 ], "c": 13 }
{ "a": { "y" : 6}, "b" : [7 ], "c": 14 })";
auto filename = temp_env->get_temp_dir() + "ParseInRangeIntegers.json";
{
std::ofstream outfile(filename, std::ofstream::out);
outfile << json_string;
}

constexpr int num_sources = 5;
std::vector<std::string> filepaths(num_sources, filename);

cudf::io::json_reader_options in_options =
cudf::io::json_reader_options::builder(cudf::io::source_info{filepaths})
.lines(true)
.recovery_mode(cudf::io::json_recovery_mode_t::FAIL);

cudf::io::table_with_metadata result = cudf::io::read_json(in_options);

std::vector<cudf::io::table_with_metadata> part_tables;
for (auto filepath : filepaths) {
cudf::io::json_reader_options part_in_options =
cudf::io::json_reader_options::builder(cudf::io::source_info{filepath})
.lines(true)
.recovery_mode(cudf::io::json_recovery_mode_t::FAIL);

part_tables.push_back(cudf::io::read_json(part_in_options));
}

auto part_table_views = std::vector<cudf::table_view>(part_tables.size());
std::transform(part_tables.begin(), part_tables.end(), part_table_views.begin(), [](auto& table) {
return table.tbl->view();
});

auto expected_result = cudf::concatenate(part_table_views);

CUDF_TEST_EXPECT_TABLES_EQUIVALENT(result.tbl->view(), expected_result->view());
}

TEST_F(JsonReaderTest, ByteRange_MultiSource)
{
std::string const json_string = R"(
{ "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 }
{ "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 }
{ "a": { "y" : 6}, "b" : [6 ], "c": 13 }
{ "a": { "y" : 6}, "b" : [7 ], "c": 14 })";
auto filename = temp_env->get_temp_dir() + "ParseInRangeIntegers.json";
{
std::ofstream outfile(filename, std::ofstream::out);
outfile << json_string;
}

constexpr int num_sources = 5;
std::vector<std::string> filepaths(num_sources, filename);

// Initialize parsing options (reading json lines)
cudf::io::json_reader_options json_lines_options =
cudf::io::json_reader_options::builder(cudf::io::source_info{filepaths})
.lines(true)
.compression(cudf::io::compression_type::NONE)
.recovery_mode(cudf::io::json_recovery_mode_t::FAIL);

// Read full test data via existing, nested JSON lines reader
cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(json_lines_options);

auto file_paths = json_lines_options.get_source().filepaths();
std::vector<std::unique_ptr<cudf::io::datasource>> datasources;
for (auto& fp : file_paths) {
datasources.emplace_back(cudf::io::datasource::create(fp));
}

// Test for different chunk sizes
for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500, 1000, 2000}) {
auto const tables = skeleton_for_parellel_chunk_reader(datasources,
json_lines_options,
chunk_size,
cudf::get_default_stream(),
rmm::mr::get_current_device_resource());

auto table_views = std::vector<cudf::table_view>(tables.size());
std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) {
return table.tbl->view();
});
auto result = cudf::concatenate(table_views);

// Verify that the data read via chunked reader matches the data read via nested JSON reader
// cannot use EQUAL due to concatenate removing null mask
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(current_reader_table.tbl->view(), result->view());
}
}
Loading