Skip to content

Commit

Permalink
Merge pull request #17917 from vbotbuildovich/backport-pr-17839-v23.3…
Browse files Browse the repository at this point in the history
….x-52

[v23.3.x] archival: Start housekeeping jobs only on a leader
  • Loading branch information
Lazin authored Apr 23, 2024
2 parents 50b992b + b3310f7 commit 1017b79
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 30 deletions.
57 changes: 33 additions & 24 deletions src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#include <seastar/core/timed_out_error.hh>
#include <seastar/core/when_all.hh>
#include <seastar/coroutine/all.hh>
#include <seastar/util/defer.hh>
#include <seastar/util/log.hh>
#include <seastar/util/noncopyable_function.hh>

Expand Down Expand Up @@ -225,9 +226,9 @@ ntp_archiver::ntp_archiver(
, _next_housekeeping(_housekeeping_jitter())
, _feature_table(parent.feature_table())
, _local_segment_merger(
maybe_make_adjacent_segment_merger(*this, parent.log()->config()))
maybe_make_adjacent_segment_merger(*this, _parent.log()->config()))
, _scrubber(maybe_make_scrubber(
*this, _remote, _feature_table.local(), parent.log()->config()))
*this, _remote, _feature_table.local(), _parent.log()->config()))
, _manifest_upload_interval(
config::shard_local_cfg()
.cloud_storage_manifest_max_upload_interval_sec.bind())
Expand All @@ -248,7 +249,7 @@ ntp_archiver::ntp_archiver(
archival_log.debug,
"created ntp_archiver {} in term {}",
_ntp,
_start_term);
_parent.term());
}

const cloud_storage::partition_manifest& ntp_archiver::manifest() const {
Expand All @@ -260,20 +261,6 @@ const cloud_storage::partition_manifest& ntp_archiver::manifest() const {
}

ss::future<> ntp_archiver::start() {
// Pre-sync the ntp_archiver to make sure that the adjacent segment merger
// can only see up to date manifest.
auto sync_timeout = config::shard_local_cfg()
.cloud_storage_metadata_sync_timeout_ms.value();
co_await _parent.archival_meta_stm()->sync(sync_timeout);

bool is_leader = _parent.is_leader();
if (_local_segment_merger) {
_local_segment_merger->set_enabled(is_leader);
}
if (_scrubber) {
_scrubber->set_enabled(is_leader);
}

if (_parent.get_ntp_config().is_read_replica_mode_enabled()) {
ssx::spawn_with_gate(_gate, [this] {
return sync_manifest_until_abort().then([this] {
Expand Down Expand Up @@ -315,13 +302,6 @@ void ntp_archiver::notify_leadership(std::optional<model::node_id> leader_id) {
if (is_leader) {
_leader_cond.signal();
}
if (_local_segment_merger) {
_local_segment_merger->set_enabled(is_leader);
}

if (_scrubber) {
_scrubber->set_enabled(is_leader);
}
}

ss::future<> ntp_archiver::upload_until_abort() {
Expand Down Expand Up @@ -367,7 +347,36 @@ ss::future<> ntp_archiver::upload_until_abort() {
if (!is_synced) {
continue;
}

vlog(_rtclog.debug, "upload loop synced in term {}", _start_term);
if (!may_begin_uploads()) {
continue;
}

if (_local_segment_merger) {
vlog(
_rtclog.debug,
"Enable adjacent segment merger in term {}",
_start_term);
_local_segment_merger->set_enabled(true);
}
if (_scrubber) {
vlog(_rtclog.debug, "Enable scrubber in term {}", _start_term);
_scrubber->set_enabled(true);
}
auto disable_hk_jobs = ss::defer([this] {
if (_local_segment_merger) {
vlog(
_rtclog.debug,
"Disable adjacent segment merger in term {}",
_start_term);
_local_segment_merger->set_enabled(false);
}
if (_scrubber) {
vlog(_rtclog.debug, "Disable scrubber in term {}", _start_term);
_scrubber->set_enabled(false);
}
});

co_await ss::with_scheduling_group(
_conf->upload_scheduling_group,
Expand Down
6 changes: 5 additions & 1 deletion src/v/cloud_storage_clients/s3_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,11 @@ ss::future<ResultT> parse_head_error_response(
code, msg, ss::sstring(rid.data(), rid.size()), key().native());
return ss::make_exception_future<ResultT>(err);
} catch (...) {
vlog(s3_log.error, "!!error parse error {}", std::current_exception());
vlog(
s3_log.error,
"!!error parse error {}, header: {}",
std::current_exception(),
hdr);
throw;
}
}
Expand Down
9 changes: 4 additions & 5 deletions tests/rptest/tests/cloud_storage_scrubber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def get_impacted_offset_ranges(self, ntpr: NTPR,


class CloudStorageScrubberTest(RedpandaTest):
scrub_timeout = 90
scrub_timeout = 200
partition_count = 3
message_size = 16 * 1024 # 16KiB
segment_size = 1024 * 1024 # 1MiB
Expand All @@ -203,12 +203,11 @@ def __init__(self, test_context):
super().__init__(
test_context=test_context,
extra_rp_conf={
"cloud_storage_enable_scrubbing": True,
"cloud_storage_partial_scrub_interval_ms": 1000,
"cloud_storage_partial_scrub_interval_ms": 100,
"cloud_storage_full_scrub_interval_ms": 1000,
"cloud_storage_scrubbing_interval_jitter_ms": 100,
"cloud_storage_scrubbing_interval_jitter_ms": 50,
# Small quota forces partial scrubs
"cloud_storage_background_jobs_quota": 30,
"cloud_storage_background_jobs_quota": 40,
# Disable segment merging since it can reupload
# the deleted segment and remove the gap
"cloud_storage_enable_segment_merging": False,
Expand Down

0 comments on commit 1017b79

Please sign in to comment.