Skip to content

Commit

Permalink
Merge pull request #16160 from dotnwat/seg-manifest-oom
Browse files Browse the repository at this point in the history
cloud_storage: use fragvec to hold replaced segments
  • Loading branch information
dotnwat authored Jan 19, 2024
2 parents 0835c3f + 33a5e48 commit ed4f529
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 40 deletions.
11 changes: 6 additions & 5 deletions src/v/cloud_storage/partition_manifest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -841,14 +841,13 @@ bool partition_manifest::advance_start_offset(model::offset new_start_offset) {
return false;
}

std::vector<partition_manifest::lw_segment_meta>
fragmented_vector<partition_manifest::lw_segment_meta>
partition_manifest::lw_replaced_segments() const {
return _replaced;
return _replaced.copy();
}

std::vector<segment_meta> partition_manifest::replaced_segments() const {
std::vector<segment_meta> res;
res.reserve(_replaced.size());
fragmented_vector<segment_meta> partition_manifest::replaced_segments() const {
fragmented_vector<segment_meta> res;
for (const auto& s : _replaced) {
res.push_back(lw_segment_meta::convert(s));
}
Expand Down Expand Up @@ -2672,6 +2671,8 @@ partition_manifest_serde_from_partition_manifest(partition_manifest const& m)
(([&]<typename Src>(auto& dest, Src const& src) {
if constexpr (std::is_same_v<Src, segment_meta_cstore>) {
dest = src.to_iobuf();
} else if constexpr (reflection::is_fragmented_vector<Src>) {
dest = src.copy();
} else {
dest = src;
}
Expand Down
6 changes: 3 additions & 3 deletions src/v/cloud_storage/partition_manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class partition_manifest : public base_manifest {
using value = segment_meta;
using segment_map = segment_meta_cstore;
using spillover_manifest_map = segment_meta_cstore;
using replaced_segments_list = std::vector<lw_segment_meta>;
using replaced_segments_list = fragmented_vector<lw_segment_meta>;
using const_iterator = segment_map::const_iterator;

/// Generate segment name to use in the cloud
Expand Down Expand Up @@ -462,11 +462,11 @@ class partition_manifest : public base_manifest {
const_iterator segment_containing(kafka::offset o) const;

// Return collection of segments that were replaced in lightweight format.
std::vector<partition_manifest::lw_segment_meta>
fragmented_vector<partition_manifest::lw_segment_meta>
lw_replaced_segments() const;

/// Return collection of segments that were replaced by newer segments.
std::vector<segment_meta> replaced_segments() const;
fragmented_vector<segment_meta> replaced_segments() const;

/// Return the number of replaced segments currently awaiting deletion.
size_t replaced_segments_count() const;
Expand Down
60 changes: 29 additions & 31 deletions src/v/cluster/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1491,44 +1491,42 @@ void archival_metadata_stm::apply_reset_scrubbing_metadata() {
_manifest->reset_scrubbing_metadata();
}

std::vector<cloud_storage::partition_manifest::lw_segment_meta>
fragmented_vector<cloud_storage::partition_manifest::lw_segment_meta>
archival_metadata_stm::get_segments_to_cleanup() const {
// Include replaced segments to the backlog
using lw_segment_meta = cloud_storage::partition_manifest::lw_segment_meta;
std::vector<lw_segment_meta> backlog = _manifest->lw_replaced_segments();
const fragmented_vector<lw_segment_meta> source_backlog
= _manifest->lw_replaced_segments();

// Make sure that 'replaced' list doesn't have any references to active
// segments. This is a protection from the data loss. This should not
// happen, but protects us from data loss in cases where bugs elsewhere.
auto backlog_size = backlog.size();
backlog.erase(
std::remove_if(
backlog.begin(),
backlog.end(),
[this](const lw_segment_meta& m) {
auto it = _manifest->find(m.base_offset);
if (it == _manifest->end()) {
return false;
}
auto m_name = _manifest->generate_remote_segment_name(
cloud_storage::partition_manifest::lw_segment_meta::convert(m));
auto s_name = _manifest->generate_remote_segment_name(*it);
// The segment will have the same path as the one we have in
// manifest in S3 so if we will delete it the data will be lost.
if (m_name == s_name) {
vlog(
_logger.error,
"The replaced segment name {} collides with the segment "
"{} "
"in the manifest. It will be removed to prevent the data "
"loss.",
m_name,
s_name);
return true;
}
return false;
}),
backlog.end());
const auto backlog_size = source_backlog.size();
fragmented_vector<lw_segment_meta> backlog;
for (const auto& m : source_backlog) {
auto it = _manifest->find(m.base_offset);
if (it == _manifest->end()) {
backlog.push_back(m);
continue;
}
auto m_name = _manifest->generate_remote_segment_name(
cloud_storage::partition_manifest::lw_segment_meta::convert(m));
auto s_name = _manifest->generate_remote_segment_name(*it);
// The segment will have the same path as the one we have in
// manifest in S3 so if we will delete it the data will be lost.
if (m_name == s_name) {
vlog(
_logger.error,
"The replaced segment name {} collides with the segment "
"{} "
"in the manifest. It will be removed to prevent the data "
"loss.",
m_name,
s_name);
continue;
}
backlog.push_back(m);
}

if (backlog.size() < backlog_size) {
vlog(
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/archival_metadata_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class archival_metadata_stm final : public raft::persisted_stm<> {

// Return list of all segments that has to be
// removed from S3.
std::vector<cloud_storage::partition_manifest::lw_segment_meta>
fragmented_vector<cloud_storage::partition_manifest::lw_segment_meta>
get_segments_to_cleanup() const;

/// Create batch builder that can be used to combine and replicate multipe
Expand Down

0 comments on commit ed4f529

Please sign in to comment.