diff --git a/src/v/storage/compaction_reducers.cc b/src/v/storage/compaction_reducers.cc index 07ca8ca0a6f6c..718c98d3ba8ee 100644 --- a/src/v/storage/compaction_reducers.cc +++ b/src/v/storage/compaction_reducers.cc @@ -340,6 +340,9 @@ ss::future copy_data_segment_reducer::filter_and_append( ss::future copy_data_segment_reducer::operator()(model::record_batch b) { + if (_inject_failure) { + throw std::runtime_error("injected error"); + } const auto comp = b.header().attrs.compression(); if (!b.compressed()) { co_return co_await filter_and_append(comp, std::move(b)); diff --git a/src/v/storage/compaction_reducers.h b/src/v/storage/compaction_reducers.h index 9f2f7a10ecd79..c332ac7dd67e8 100644 --- a/src/v/storage/compaction_reducers.h +++ b/src/v/storage/compaction_reducers.h @@ -125,13 +125,15 @@ class copy_data_segment_reducer : public compaction_reducer { bool internal_topic, offset_delta_time apply_offset, model::offset segment_last_offset = model::offset{}, - compacted_index_writer* cidx = nullptr) + compacted_index_writer* cidx = nullptr, + bool inject_failure = false) : _should_keep_fn(std::move(f)) , _segment_last_offset(segment_last_offset) , _appender(a) , _compacted_idx(cidx) , _idx(index_state::make_empty_index(apply_offset)) - , _internal_topic(internal_topic) {} + , _internal_topic(internal_topic) + , _inject_failure(inject_failure) {} ss::future operator()(model::record_batch); storage::index_state end_of_stream() { return std::move(_idx); } @@ -161,6 +163,9 @@ class copy_data_segment_reducer : public compaction_reducer { /// We need to know if this is an internal topic to inform whether to /// index on non-raft-data batches bool _internal_topic; + + /// If set to true, will throw an exception on operator(). + bool _inject_failure; }; class index_rebuilder_reducer : public compaction_reducer { diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index 4ad48e9d4168b..40a908162ef96 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -447,7 +447,7 @@ ss::future<> disk_log_impl::adjacent_merge_compact( *_probe, *_readers_cache, _manager.resources(), - storage::internal::should_apply_delta_time_offset(_feature_table)); + _feature_table); vlog( gclog.debug, @@ -545,7 +545,7 @@ ss::future disk_log_impl::sliding_window_compact( *_probe, *_readers_cache, _manager.resources(), - storage::internal::should_apply_delta_time_offset(_feature_table)); + _feature_table); vlog( gclog.debug, @@ -688,8 +688,8 @@ ss::future disk_log_impl::sliding_window_compact( *appender, compacted_idx_writer, *_probe, - storage::internal::should_apply_delta_time_offset( - _feature_table)); + storage::internal::should_apply_delta_time_offset(_feature_table), + _feature_table); } catch (...) { eptr = std::current_exception(); @@ -906,7 +906,7 @@ ss::future disk_log_impl::compact_adjacent_segments( *_probe, *_readers_cache, _manager.resources(), - storage::internal::should_apply_delta_time_offset(_feature_table)); + _feature_table); _probe->delete_segment(*replacement.get()); vlog(gclog.debug, "Final compacted segment {}", replacement); diff --git a/src/v/storage/fs_utils.cc b/src/v/storage/fs_utils.cc index 59c1056750b78..8419d223a39d9 100644 --- a/src/v/storage/fs_utils.cc +++ b/src/v/storage/fs_utils.cc @@ -106,7 +106,7 @@ segment_full_path segment_full_path::to_compacted_index() const { if (extension == ".log") { return with_extension(".compaction_index"); } else if (extension == ".log.compaction.staging") { - return with_extension("log.compaction.compaction_index"); + return with_extension(".log.compaction.compaction_index"); } else { vassert(false, "Unexpected extension {}", extension); } diff --git a/src/v/storage/segment_deduplication_utils.cc b/src/v/storage/segment_deduplication_utils.cc index ff994490b8517..420be1fbe94e2 100644 --- a/src/v/storage/segment_deduplication_utils.cc +++ b/src/v/storage/segment_deduplication_utils.cc @@ -186,7 +186,9 @@ ss::future deduplicate_segment( segment_appender& appender, compacted_index_writer& cmp_idx_writer, probe& probe, - offset_delta_time should_offset_delta_times) { + offset_delta_time should_offset_delta_times, + ss::sharded& feature_table, + bool inject_reader_failure) { auto read_holder = co_await seg->read_lock(); if (seg->is_closed()) { throw segment_closed_exception(); @@ -201,9 +203,10 @@ ss::future deduplicate_segment( seg->path().is_internal_topic(), should_offset_delta_times, seg->offsets().committed_offset, - &cmp_idx_writer); + &cmp_idx_writer, + inject_reader_failure); - auto new_idx = co_await rdr.consume( + auto new_idx = co_await std::move(rdr).consume( std::move(copy_reducer), model::no_timeout); new_idx.broker_timestamp = seg->index().broker_timestamp(); co_return new_idx; diff --git a/src/v/storage/segment_deduplication_utils.h b/src/v/storage/segment_deduplication_utils.h index 8f0e28c2c5c76..d3f3582b3ea30 100644 --- a/src/v/storage/segment_deduplication_utils.h +++ b/src/v/storage/segment_deduplication_utils.h @@ -56,6 +56,8 @@ ss::future deduplicate_segment( segment_appender& appender, compacted_index_writer& cmp_idx_writer, storage::probe& probe, - offset_delta_time should_offset_delta_times); + offset_delta_time should_offset_delta_times, + ss::sharded&, + bool inject_reader_failure = false); } // namespace storage diff --git a/src/v/storage/segment_utils.cc b/src/v/storage/segment_utils.cc index 5505666f231b3..676009dafc3c0 100644 --- a/src/v/storage/segment_utils.cc +++ b/src/v/storage/segment_utils.cc @@ -373,7 +373,8 @@ ss::future do_copy_segment_data( storage::probe& pb, ss::rwlock::holder rw_lock_holder, storage_resources& resources, - offset_delta_time apply_offset) { + offset_delta_time apply_offset, + ss::sharded&) { // preserve broker_timestamp from the segment's index auto old_broker_timestamp = seg->index().broker_timestamp(); @@ -499,7 +500,8 @@ ss::future> do_self_compact_segment( storage::readers_cache& readers_cache, storage_resources& resources, offset_delta_time apply_offset, - ss::rwlock::holder read_holder) { + ss::rwlock::holder read_holder, + ss::sharded& feature_table) { vlog(gclog.trace, "self compacting segment {}", s->reader().path()); auto segment_generation = s->get_generation_id(); @@ -516,7 +518,13 @@ ss::future> do_self_compact_segment( auto staging_to_clean = scoped_file_tracker{ cfg.files_to_cleanup, {staging_file}}; auto idx = co_await do_copy_segment_data( - s, cfg, pb, std::move(read_holder), resources, apply_offset); + s, + cfg, + pb, + std::move(read_holder), + resources, + apply_offset, + feature_table); auto rdr_holder = co_await readers_cache.evict_segment_readers(s); @@ -645,7 +653,7 @@ ss::future self_compact_segment( storage::probe& pb, storage::readers_cache& readers_cache, storage_resources& resources, - offset_delta_time apply_offset) { + ss::sharded& feature_table) { if (s->has_appender()) { throw std::runtime_error(fmt::format( "Cannot compact an active segment. cfg:{} - segment:{}", cfg, s)); @@ -702,6 +710,7 @@ ss::future self_compact_segment( "Unexpected state {}", int(state)); auto sz_before = s->size_bytes(); + auto apply_offset = should_apply_delta_time_offset(feature_table); auto sz_after = co_await do_self_compact_segment( s, cfg, @@ -709,7 +718,8 @@ ss::future self_compact_segment( readers_cache, resources, apply_offset, - std::move(read_holder)); + std::move(read_holder), + feature_table); // compaction wasn't executed, return if (!sz_after) { co_return compaction_result(sz_before); diff --git a/src/v/storage/segment_utils.h b/src/v/storage/segment_utils.h index e3342a922de75..b47d49c02bfcc 100644 --- a/src/v/storage/segment_utils.h +++ b/src/v/storage/segment_utils.h @@ -39,7 +39,7 @@ ss::future self_compact_segment( storage::probe&, storage::readers_cache&, storage::storage_resources&, - offset_delta_time apply_offset); + ss::sharded& feature_table); /// \brief, rebuilds a given segment's compacted index. This method acquires /// locks on the segment. diff --git a/src/v/storage/tests/segment_deduplication_test.cc b/src/v/storage/tests/segment_deduplication_test.cc index 9190c8899222e..58a1e06e9c507 100644 --- a/src/v/storage/tests/segment_deduplication_test.cc +++ b/src/v/storage/tests/segment_deduplication_test.cc @@ -9,6 +9,7 @@ #include "gmock/gmock.h" #include "random/generators.h" +#include "storage/chunk_cache.h" #include "storage/disk_log_impl.h" #include "storage/key_offset_map.h" #include "storage/segment_deduplication_utils.h" @@ -27,6 +28,7 @@ using namespace storage; namespace { ss::abort_source never_abort; +ss::sharded feature_table; } // anonymous namespace // Builds a segment layout: @@ -165,6 +167,13 @@ TEST(BuildOffsetMap, TestBuildSimpleMap) { model::offset{30}, ss::default_priority_class(), never_abort); probe pb; + feature_table.start().get(); + feature_table + .invoke_on_all( + [](features::feature_table& f) { f.testing_activate_all(); }) + .get(); + auto defer = ss::defer([] { feature_table.stop().get(); }); + // Self-compact each segment so we're left with compaction indices. This is // a requirement to build the offset map. for (auto& seg : segs) { @@ -175,7 +184,7 @@ TEST(BuildOffsetMap, TestBuildSimpleMap) { pb, disk_log.readers(), disk_log.resources(), - offset_delta_time::yes) + feature_table) .get(); } @@ -253,3 +262,71 @@ TEST(BuildOffsetMap, TestBuildMapWithMissingCompactedIndex) { ASSERT_TRUE(ss::file_exists(idx_path.string()).get()); } } + +// Regression test that ensures that segment deduplication doesn't crash +// Redpanda when it hits an error on the read path. +TEST(DeduplicateSegmentsTest, TestBadReader) { + storage::disk_log_builder b; + build_segments( + b, + /*num_segs=*/5, + /*records_per_seg=*/10, + /*start_offset=*/0, + /*mark_compacted=*/false); + auto cleanup = ss::defer([&] { b.stop().get(); }); + auto& disk_log = b.get_disk_log_impl(); + auto& segs = disk_log.segments(); + + // Build an offset map for our log. + compaction_config cfg( + model::offset{0}, ss::default_priority_class(), never_abort); + simple_key_offset_map all_segs_map(50); + auto map_start_offset = build_offset_map( + cfg, + segs, + disk_log.stm_manager(), + disk_log.resources(), + disk_log.get_probe(), + all_segs_map) + .get(); + ASSERT_EQ(map_start_offset(), 0); + + // Set up an appender and index writer. + auto first_seg = segs[0]; + const auto tmpname = first_seg->reader().path().to_compaction_staging(); + auto appender = storage::internal::make_segment_appender( + tmpname, + segment_appender::write_behind_memory + / storage::internal::chunks().chunk_size(), + std::nullopt, + cfg.iopc, + disk_log.resources(), + cfg.sanitizer_config) + .get(); + const auto cmp_idx_tmpname = tmpname.to_compacted_index(); + auto compacted_idx_writer = make_file_backed_compacted_index( + cmp_idx_tmpname, + cfg.iopc, + true, + disk_log.resources(), + cfg.sanitizer_config); + auto close = ss::defer([&] { + compacted_idx_writer.close().get(); + appender->close().get(); + }); + + // Test that injecting a failure only throws an exception, i.e. no crashes! + EXPECT_THROW( + deduplicate_segment( + cfg, + all_segs_map, + first_seg, + *appender, + compacted_idx_writer, + disk_log.get_probe(), + storage::internal::should_apply_delta_time_offset(b.feature_table()), + b.feature_table(), + /*inject_reader_failure=*/true) + .get(), + std::runtime_error); +} diff --git a/src/v/storage/tests/utils/disk_log_builder.h b/src/v/storage/tests/utils/disk_log_builder.h index 64ee7e44243e8..860838d1b5a3a 100644 --- a/src/v/storage/tests/utils/disk_log_builder.h +++ b/src/v/storage/tests/utils/disk_log_builder.h @@ -376,6 +376,10 @@ class disk_log_builder { void set_time(model::timestamp t) { _ts_cursor = t; } + ss::sharded& feature_table() { + return _feature_table; + } + private: template auto consume_impl(Consumer c, log_reader_config config) {