diff --git a/src/v/archival/ntp_archiver_service.cc b/src/v/archival/ntp_archiver_service.cc index f75b8ad483409..c95230f3348ff 100644 --- a/src/v/archival/ntp_archiver_service.cc +++ b/src/v/archival/ntp_archiver_service.cc @@ -55,6 +55,7 @@ #include #include #include +#include #include #include @@ -225,9 +226,9 @@ ntp_archiver::ntp_archiver( , _next_housekeeping(_housekeeping_jitter()) , _feature_table(parent.feature_table()) , _local_segment_merger( - maybe_make_adjacent_segment_merger(*this, parent.log()->config())) + maybe_make_adjacent_segment_merger(*this, _parent.log()->config())) , _scrubber(maybe_make_scrubber( - *this, _remote, _feature_table.local(), parent.log()->config())) + *this, _remote, _feature_table.local(), _parent.log()->config())) , _manifest_upload_interval( config::shard_local_cfg() .cloud_storage_manifest_max_upload_interval_sec.bind()) @@ -248,7 +249,7 @@ ntp_archiver::ntp_archiver( archival_log.debug, "created ntp_archiver {} in term {}", _ntp, - _start_term); + _parent.term()); } const cloud_storage::partition_manifest& ntp_archiver::manifest() const { @@ -260,20 +261,6 @@ const cloud_storage::partition_manifest& ntp_archiver::manifest() const { } ss::future<> ntp_archiver::start() { - // Pre-sync the ntp_archiver to make sure that the adjacent segment merger - // can only see up to date manifest. - auto sync_timeout = config::shard_local_cfg() - .cloud_storage_metadata_sync_timeout_ms.value(); - co_await _parent.archival_meta_stm()->sync(sync_timeout); - - bool is_leader = _parent.is_leader(); - if (_local_segment_merger) { - _local_segment_merger->set_enabled(is_leader); - } - if (_scrubber) { - _scrubber->set_enabled(is_leader); - } - if (_parent.get_ntp_config().is_read_replica_mode_enabled()) { ssx::spawn_with_gate(_gate, [this] { return sync_manifest_until_abort().then([this] { @@ -315,13 +302,6 @@ void ntp_archiver::notify_leadership(std::optional leader_id) { if (is_leader) { _leader_cond.signal(); } - if (_local_segment_merger) { - _local_segment_merger->set_enabled(is_leader); - } - - if (_scrubber) { - _scrubber->set_enabled(is_leader); - } } ss::future<> ntp_archiver::upload_until_abort() { @@ -367,7 +347,36 @@ ss::future<> ntp_archiver::upload_until_abort() { if (!is_synced) { continue; } + vlog(_rtclog.debug, "upload loop synced in term {}", _start_term); + if (!may_begin_uploads()) { + continue; + } + + if (_local_segment_merger) { + vlog( + _rtclog.debug, + "Enable adjacent segment merger in term {}", + _start_term); + _local_segment_merger->set_enabled(true); + } + if (_scrubber) { + vlog(_rtclog.debug, "Enable scrubber in term {}", _start_term); + _scrubber->set_enabled(true); + } + auto disable_hk_jobs = ss::defer([this] { + if (_local_segment_merger) { + vlog( + _rtclog.debug, + "Disable adjacent segment merger in term {}", + _start_term); + _local_segment_merger->set_enabled(false); + } + if (_scrubber) { + vlog(_rtclog.debug, "Disable scrubber in term {}", _start_term); + _scrubber->set_enabled(false); + } + }); co_await ss::with_scheduling_group( _conf->upload_scheduling_group, diff --git a/src/v/cloud_storage_clients/s3_client.cc b/src/v/cloud_storage_clients/s3_client.cc index e8d32ac6963ab..2ea6bb7f3096a 100644 --- a/src/v/cloud_storage_clients/s3_client.cc +++ b/src/v/cloud_storage_clients/s3_client.cc @@ -415,7 +415,11 @@ ss::future parse_head_error_response( code, msg, ss::sstring(rid.data(), rid.size()), key().native()); return ss::make_exception_future(err); } catch (...) { - vlog(s3_log.error, "!!error parse error {}", std::current_exception()); + vlog( + s3_log.error, + "!!error parse error {}, header: {}", + std::current_exception(), + hdr); throw; } } diff --git a/tests/rptest/tests/cloud_storage_scrubber_test.py b/tests/rptest/tests/cloud_storage_scrubber_test.py index 2e688bb262dff..02eaf84010b33 100644 --- a/tests/rptest/tests/cloud_storage_scrubber_test.py +++ b/tests/rptest/tests/cloud_storage_scrubber_test.py @@ -185,7 +185,7 @@ def get_impacted_offset_ranges(self, ntpr: NTPR, class CloudStorageScrubberTest(RedpandaTest): - scrub_timeout = 90 + scrub_timeout = 200 partition_count = 3 message_size = 16 * 1024 # 16KiB segment_size = 1024 * 1024 # 1MiB @@ -203,12 +203,11 @@ def __init__(self, test_context): super().__init__( test_context=test_context, extra_rp_conf={ - "cloud_storage_enable_scrubbing": True, - "cloud_storage_partial_scrub_interval_ms": 1000, + "cloud_storage_partial_scrub_interval_ms": 100, "cloud_storage_full_scrub_interval_ms": 1000, - "cloud_storage_scrubbing_interval_jitter_ms": 100, + "cloud_storage_scrubbing_interval_jitter_ms": 50, # Small quota forces partial scrubs - "cloud_storage_background_jobs_quota": 30, + "cloud_storage_background_jobs_quota": 40, # Disable segment merging since it can reupload # the deleted segment and remove the gap "cloud_storage_enable_segment_merging": False,