Skip to content

Commit

Permalink
storage: fix windowed compaction error handling
Browse files Browse the repository at this point in the history
We were previously using the l-value-variant of
model::record_batch_reader::consume(), which doesn't have a built-in
call to finally(). For a log_reader, this meant that in some scenarios
(e.g. if the reducer were to fail) we could end up not calling
finally(), and therefore wouldn't clear the underlying segment reader.

This is exactly what happened, as we hit:

std::runtime_error (lz4f_compressend:ERROR_dstMaxSize_tooSmall)

errors as we compressed/decompressed batches, and then ultimately
crashed because the log_reader was not closed:

Assert failure: (.../redpanda/redpanda/src/v/storage/log_reader.h:141) '!_iterator.reader' log reader destroyed with live reader"
  • Loading branch information
andrwng committed Mar 7, 2024
1 parent 958aaf7 commit cc06447
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/v/storage/segment_deduplication_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ ss::future<index_state> deduplicate_segment(
&cmp_idx_writer,
inject_reader_failure);

auto new_idx = co_await rdr.consume(
auto new_idx = co_await std::move(rdr).consume(
std::move(copy_reducer), model::no_timeout);
new_idx.broker_timestamp = seg->index().broker_timestamp();
co_return new_idx;
Expand Down
69 changes: 69 additions & 0 deletions src/v/storage/tests/segment_deduplication_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "gmock/gmock.h"
#include "random/generators.h"
#include "storage/chunk_cache.h"
#include "storage/disk_log_impl.h"
#include "storage/key_offset_map.h"
#include "storage/segment_deduplication_utils.h"
Expand Down Expand Up @@ -261,3 +262,71 @@ TEST(BuildOffsetMap, TestBuildMapWithMissingCompactedIndex) {
ASSERT_TRUE(ss::file_exists(idx_path.string()).get());
}
}

// Regression test that ensures that segment deduplication doesn't crash
// Redpanda when it hits an error on the read path.
TEST(DeduplicateSegmentsTest, TestBadReader) {
storage::disk_log_builder b;
build_segments(
b,
/*num_segs=*/5,
/*records_per_seg=*/10,
/*start_offset=*/0,
/*mark_compacted=*/false);
auto cleanup = ss::defer([&] { b.stop().get(); });
auto& disk_log = b.get_disk_log_impl();
auto& segs = disk_log.segments();

// Build an offset map for our log.
compaction_config cfg(
model::offset{0}, ss::default_priority_class(), never_abort);
simple_key_offset_map all_segs_map(50);
auto map_start_offset = build_offset_map(
cfg,
segs,
disk_log.stm_manager(),
disk_log.resources(),
disk_log.get_probe(),
all_segs_map)
.get();
ASSERT_EQ(map_start_offset(), 0);

// Set up an appender and index writer.
auto first_seg = segs[0];
const auto tmpname = first_seg->reader().path().to_compaction_staging();
auto appender = storage::internal::make_segment_appender(
tmpname,
segment_appender::write_behind_memory
/ storage::internal::chunks().chunk_size(),
std::nullopt,
cfg.iopc,
disk_log.resources(),
cfg.sanitizer_config)
.get();
const auto cmp_idx_tmpname = tmpname.to_compacted_index();
auto compacted_idx_writer = make_file_backed_compacted_index(
cmp_idx_tmpname,
cfg.iopc,
true,
disk_log.resources(),
cfg.sanitizer_config);
auto close = ss::defer([&] {
compacted_idx_writer.close().get();
appender->close().get();
});

// Test that injecting a failure only throws an exception, i.e. no crashes!
EXPECT_THROW(
deduplicate_segment(
cfg,
all_segs_map,
first_seg,
*appender,
compacted_idx_writer,
disk_log.get_probe(),
storage::internal::should_apply_delta_time_offset(b.feature_table()),
b.feature_table(),
/*inject_reader_failure=*/true)
.get(),
std::runtime_error);
}

0 comments on commit cc06447

Please sign in to comment.