diff --git a/src/v/cloud_storage/remote_partition.cc b/src/v/cloud_storage/remote_partition.cc index 313f6bd3cff0d..ecc5a666a669a 100644 --- a/src/v/cloud_storage/remote_partition.cc +++ b/src/v/cloud_storage/remote_partition.cc @@ -401,69 +401,58 @@ class partition_record_batch_reader_impl final "{}", _seg_reader->config()); - try { - auto result = co_await _seg_reader->read_some( - deadline, *_ot_state); - throw_on_external_abort(); + auto result = co_await _seg_reader->read_some( + deadline, *_ot_state); + throw_on_external_abort(); - if (!result) { - vlog( - _ctxlog.debug, - "Error while reading from stream '{}'", - result.error()); - co_await set_end_of_stream(); - throw std::system_error(result.error()); - } - data_t d = std::move(result.value()); - for (const auto& batch : d) { - _partition->_probe.add_bytes_read( - batch.header().size_bytes); - _partition->_probe.add_records_read( - batch.record_count()); - } - if ( - _first_produced_offset == model::offset{} && !d.empty()) { - _first_produced_offset = d.front().base_offset(); - } - co_return storage_t{std::move(d)}; - } catch (const stuck_reader_exception& ex) { - throw_on_external_abort(); + if (!result) { vlog( - _ctxlog.warn, - "stuck reader: current rp offset: {}, max rp offset: {}", - ex.rp_offset, - _seg_reader->max_rp_offset()); - - // If the reader is stuck because of a mismatch between - // segment data and manifest entry, set reader to EOF and - // try to reset reader on the next loop iteration. We only - // do this when the reader has not reached eof. For example, - // the segment ends at offset 10 but the manifest has max - // offset at 11 for the segment, with offset 11 actually - // present in the next segment. When the reader is stuck, - // the current offset will be 10 which we will not be able - // to read from. Switching to the next segment should enable - // reads to proceed. - if ( - model::next_offset(ex.rp_offset) - >= _next_segment_base_offset - && !_seg_reader->is_eof()) { - vlog( - _ctxlog.info, - "mismatch between current segment end and manifest " - "data: current rp offset {}, manifest max rp offset " - "{}, next segment base offset {}, reader is EOF: {}. " - "set EOF on reader and try to " - "reset", - ex.rp_offset, - _seg_reader->max_rp_offset(), - _next_segment_base_offset, - _seg_reader->is_eof()); - _seg_reader->set_eof(); - continue; + _ctxlog.debug, + "Error while reading from stream '{}'", + result.error()); + co_await set_end_of_stream(); + throw std::system_error(result.error()); + } + data_t d = std::move(result.value()); + for (const auto& batch : d) { + _partition->_probe.add_bytes_read( + batch.header().size_bytes); + _partition->_probe.add_records_read(batch.record_count()); + } + if (_first_produced_offset == model::offset{} && !d.empty()) { + _first_produced_offset = d.front().base_offset(); + } else { + auto current_ko = _ot_state->from_log_offset( + _seg_reader->current_rp_offset()); + vlog( + _ctxlog.debug, + "No results, current rp offset: {}, current kafka " + "offset: {}, max rp offset: " + "{}", + _seg_reader->current_rp_offset(), + current_ko, + _seg_reader->config().max_offset); + if (current_ko > _seg_reader->config().max_offset) { + // Reader overshoot the offset. If we will not reset + // the stream the loop inside the + // record_batch_reader will keep calling this method + // again and again. We will be returning empty + // result every time because the current offset + // overshoot the max allowed offset. Resetting the + // segment reader fixes the issue. + // + // We can get into the situation when the current + // reader returns empty result in several cases: + // - we reached max_offset (covered here) + // - we reached end of stream (covered above right + // after the 'read_some' call) + // + // If we reached max-bytes then the result won't be + // empty. It will have at least one record batch. + co_await set_end_of_stream(); } - throw; } + co_return storage_t{std::move(d)}; } } catch (const ss::gate_closed_exception&) { vlog( diff --git a/src/v/cloud_storage/remote_segment.h b/src/v/cloud_storage/remote_segment.h index 7cde77af091c4..7ca4bd9dbd613 100644 --- a/src/v/cloud_storage/remote_segment.h +++ b/src/v/cloud_storage/remote_segment.h @@ -37,19 +37,6 @@ namespace cloud_storage { -class stuck_reader_exception final : public std::runtime_error { -public: - stuck_reader_exception( - model::offset cur_rp_offset, - size_t cur_bytes_consumed, - ss::sstring context) - : std::runtime_error{context} - , rp_offset(cur_rp_offset) - , bytes_consumed(cur_bytes_consumed) {} - const model::offset rp_offset; - const size_t bytes_consumed; -}; - std::filesystem::path generate_index_path(const cloud_storage::remote_segment_path& p); diff --git a/src/v/cloud_storage/tests/remote_partition_fuzz_test.cc b/src/v/cloud_storage/tests/remote_partition_fuzz_test.cc index b4b879bf8ef53..605ccfdce181c 100644 --- a/src/v/cloud_storage/tests/remote_partition_fuzz_test.cc +++ b/src/v/cloud_storage/tests/remote_partition_fuzz_test.cc @@ -14,6 +14,7 @@ #include "cloud_storage/tests/cloud_storage_fixture.h" #include "cloud_storage/tests/s3_imposter.h" #include "cloud_storage/tests/util.h" +#include "model/record_batch_types.h" #include @@ -337,6 +338,49 @@ FIXTURE_TEST( } } +FIXTURE_TEST( + test_remote_partition_scan_incrementally_random_with_tx_fence_random_lso, + cloud_storage_fixture) { + vlog( + test_log.info, + "Seed used for read workload: {}", + random_generators::internal::seed); + + constexpr int num_segments = 1000; + const auto [segment_layout, num_data_batches] = generate_segment_layout( + num_segments, 42, false); + auto segments = setup_s3_imposter(*this, segment_layout); + auto base = segments[0].base_offset; + auto max = segments.back().max_offset; + vlog(test_log.debug, "offset range: {}-{}", base, max); + + try { + auto headers_read + = scan_remote_partition_incrementally_with_closest_lso( + *this, base, max, 5, 5); + vlog(test_log.debug, "{} record batches consumed", headers_read.size()); + model::offset expected_offset{0}; + size_t ix_header = 0; + for (const auto& ix_seg : segment_layout) { + for (const auto& batch : ix_seg) { + if (batch.type == model::record_batch_type::tx_fence) { + expected_offset++; + } else if (batch.type == model::record_batch_type::raft_data) { + auto header = headers_read[ix_header]; + BOOST_REQUIRE_EQUAL(expected_offset, header.base_offset); + expected_offset = header.last_offset() + model::offset(1); + ix_header++; + } else { + // raft_configuratoin or archival_metadata + // no need to update expected_offset or ix_header + } + } + } + } catch (const download_exception& ex) { + vlog(test_log.warn, "timeout connecting to s3 impostor: {}", ex.what()); + } +} + FIXTURE_TEST( test_remote_partition_scan_incrementally_random_with_reuploads, cloud_storage_fixture) { diff --git a/src/v/cloud_storage/tests/util.cc b/src/v/cloud_storage/tests/util.cc index 67a3be0276ca8..aa1cd1ac8e6ac 100644 --- a/src/v/cloud_storage/tests/util.cc +++ b/src/v/cloud_storage/tests/util.cc @@ -10,6 +10,7 @@ */ #include "cloud_storage/tests/util.h" +#include "model/record.h" #include "model/record_batch_types.h" #include @@ -824,6 +825,89 @@ scan_result scan_remote_partition( }; } +std::vector +scan_remote_partition_incrementally_with_closest_lso( + cloud_storage_fixture& imposter, + model::offset base, + model::offset max, + size_t maybe_max_segments, + size_t maybe_max_readers) { + ss::lowres_clock::update(); + auto conf = imposter.get_configuration(); + static auto bucket = cloud_storage_clients::bucket_name("bucket"); + if (maybe_max_segments) { + config::shard_local_cfg() + .cloud_storage_max_materialized_segments_per_shard.set_value( + maybe_max_segments); + } + if (maybe_max_readers) { + config::shard_local_cfg() + .cloud_storage_max_segment_readers_per_shard.set_value( + maybe_max_readers); + } + auto manifest = hydrate_manifest(imposter.api.local(), bucket); + partition_probe probe(manifest.get_ntp()); + + auto manifest_view = ss::make_shared( + imposter.api, imposter.cache, manifest, bucket); + + auto partition = ss::make_shared( + manifest_view, + imposter.api.local(), + imposter.cache.local(), + bucket, + probe); + + auto partition_stop = ss::defer([&partition] { partition->stop().get(); }); + + partition->start().get(); + + std::vector headers; + + storage::log_reader_config reader_config( + base, model::next_offset(base), ss::default_priority_class()); + + // starting max_bytes + reader_config.max_bytes = 1; + + auto next = base; + + int num_fetches = 0; + while (next < max) { + reader_config.start_offset = next; + reader_config.max_offset = model::next_offset(next); + vlog(test_util_log.info, "reader_config {}", reader_config); + auto reader = partition->make_reader(reader_config).get().reader; + auto headers_read + = reader.consume(test_consumer(), model::no_timeout).get(); + if (headers_read.empty()) { + // If the reader returned the empty result then the offset + // corresponds to tx-batch. Our own tx-batches looks like offset + // gaps to the client. We're always adding tx-batches with only one + // record so we can increment the 'next' offset and continue. + next = model::next_offset(next); + vlog( + test_util_log.info, + "Reader config: {} produced empty result, next offset set to {}", + reader_config, + next); + // test is prepared to see the gaps in place of tx-fence batches + continue; + } + BOOST_REQUIRE(headers_read.size() == 1); + vlog(test_util_log.info, "header {}", headers_read.front()); + next = headers_read.back().last_offset() + model::offset(1); + std::copy( + headers_read.begin(), + headers_read.end(), + std::back_inserter(headers)); + num_fetches++; + } + BOOST_REQUIRE(num_fetches > 0); + vlog(test_util_log.info, "{} fetch operations performed", num_fetches); + return headers; +} + void reupload_compacted_segments( cloud_storage_fixture& fixture, cloud_storage::partition_manifest& m, diff --git a/src/v/cloud_storage/tests/util.h b/src/v/cloud_storage/tests/util.h index f29f5a9b86518..a247b1e74e949 100644 --- a/src/v/cloud_storage/tests/util.h +++ b/src/v/cloud_storage/tests/util.h @@ -218,4 +218,14 @@ std::vector replace_segments( model::offset_delta base_delta, const std::vector>& batches); +/// Read batches by one using max_bytes=1 and set max_offset to closes +/// value in the 'possible_lso_values' list. +std::vector +scan_remote_partition_incrementally_with_closest_lso( + cloud_storage_fixture& imposter, + model::offset base, + model::offset max, + size_t maybe_max_segments, + size_t maybe_max_readers); + } // namespace cloud_storage