From fb7be73e57f83ae56a11ada7c71f3e3fdb7775fc Mon Sep 17 00:00:00 2001 From: alexey bashtanov Date: Mon, 16 Sep 2024 13:43:35 +0100 Subject: [PATCH 1/5] c/migrations: notify migrated resources before backend That's to make sure the resource is locked by the time backend and worker's works are started. --- src/v/cluster/data_migration_table.cc | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/src/v/cluster/data_migration_table.cc b/src/v/cluster/data_migration_table.cc index fc9df5b615974..3efda0817970b 100644 --- a/src/v/cluster/data_migration_table.cc +++ b/src/v/cluster/data_migration_table.cc @@ -84,7 +84,12 @@ migrations_table::fill_snapshot(controller_snapshot& snapshot) const { ss::future<> migrations_table::apply_snapshot( model::offset, const controller_snapshot& snapshot) { _next_id = snapshot.data_migrations.next_id; - _migrations.reserve(snapshot.data_migrations.migrations.size()); + + auto snapshot_size = snapshot.data_migrations.migrations.size(); + _migrations.reserve(snapshot_size); + + std::vector affected_ids; + affected_ids.reserve(snapshot_size); std::vector deleted; auto it = _migrations.cbegin(); @@ -92,7 +97,7 @@ ss::future<> migrations_table::apply_snapshot( auto prev = it++; if (!snapshot.data_migrations.migrations.contains(prev->first)) { auto extracted = _migrations.extract(prev); - _callbacks.notify(extracted.key()); + affected_ids.push_back(extracted.key()); deleted.push_back(std::move(extracted.mapped())); } } @@ -109,14 +114,19 @@ ss::future<> migrations_table::apply_snapshot( } it->second.state = migration.state; } - _callbacks.notify(id); + affected_ids.push_back(id); updated.emplace_back(it->second); } + // notify callbacks after resources, see comment in migrations_table::apply co_await _resources.invoke_on_all( [&deleted, &updated](migrated_resources& resources) { resources.apply_snapshot(deleted, updated); }); + + for (auto id : affected_ids) { + _callbacks.notify(id); + } } std::optional> @@ -176,12 +186,14 @@ migrations_table::apply(create_data_migration_cmd cmd) { } _last_applied = id; _next_id = std::max(_next_id, _last_applied + data_migrations::id(1)); - _callbacks.notify(id); - // update migrated resources + // It is vital to update resources before subscribers, backend in + // particular. When backend works on the migrated entities it relies on them + // being locked by resources. co_await _resources.invoke_on_all( [&meta = it->second](migrated_resources& resources) { resources.apply_update(meta); }); + _callbacks.notify(id); co_return errc::success; } @@ -279,11 +291,12 @@ migrations_table::apply(update_data_migration_state_cmd cmd) { co_return errc::invalid_data_migration_state; } it->second.state = requested_state; - _callbacks.notify(id); + // notify callbacks after resources, see comment in migrations_table::apply co_await _resources.invoke_on_all( [&meta = it->second](migrated_resources& resources) { resources.apply_update(meta); }); + _callbacks.notify(id); co_return errc::success; } @@ -304,11 +317,11 @@ migrations_table::apply(remove_data_migration_cmd cmd) { case state::planned: { auto meta = std::move(it->second); _migrations.erase(it); - _callbacks.notify(id); co_await _resources.invoke_on_all( [&meta](migrated_resources& resources) { resources.remove_migration(meta); }); + _callbacks.notify(id); co_return errc::success; } From 02e5244e88a00eed1de038922ebb141bea24fbe6 Mon Sep 17 00:00:00 2001 From: alexey bashtanov Date: Mon, 16 Sep 2024 15:55:30 +0100 Subject: [PATCH 2/5] c/migrations: introduce create_only restriction level for migrated resources This is to disallow a situation where a coordinator regained leadership is creating a topic when it's time to actually delete it as the migration has been cancelled. --- src/v/cluster/data_migrated_resources.cc | 1 + src/v/cluster/data_migration_types.cc | 2 ++ src/v/cluster/data_migration_types.h | 1 + src/v/cluster/metadata_cache.cc | 2 +- src/v/cluster/topics_frontend.cc | 25 ++++++++++++++---------- 5 files changed, 20 insertions(+), 11 deletions(-) diff --git a/src/v/cluster/data_migrated_resources.cc b/src/v/cluster/data_migrated_resources.cc index df3a96b3de28d..a45a69943dc7d 100644 --- a/src/v/cluster/data_migrated_resources.cc +++ b/src/v/cluster/data_migrated_resources.cc @@ -51,6 +51,7 @@ migrated_resource_state get_resource_state(state state) { case state::planned: return migrated_resource_state::metadata_locked; case state::preparing: + return migrated_resource_state::create_only; case state::prepared: case state::canceling: case state::executing: diff --git a/src/v/cluster/data_migration_types.cc b/src/v/cluster/data_migration_types.cc index d366939aa977a..a4c29cf22dc4b 100644 --- a/src/v/cluster/data_migration_types.cc +++ b/src/v/cluster/data_migration_types.cc @@ -95,6 +95,8 @@ std::ostream& operator<<(std::ostream& o, migrated_resource_state state) { return o << "restricted"; case migrated_resource_state::read_only: return o << "read_only"; + case migrated_resource_state::create_only: + return o << "create_only"; case migrated_resource_state::fully_blocked: return o << "fully_blocked"; } diff --git a/src/v/cluster/data_migration_types.h b/src/v/cluster/data_migration_types.h index 8b1df1a0ba6b7..4b7bae645a94e 100644 --- a/src/v/cluster/data_migration_types.h +++ b/src/v/cluster/data_migration_types.h @@ -101,6 +101,7 @@ enum class migrated_resource_state { non_restricted, metadata_locked, read_only, + create_only, // can only be created, and only by migrations fully_blocked }; diff --git a/src/v/cluster/metadata_cache.cc b/src/v/cluster/metadata_cache.cc index 94f2045fd0e51..cff7440af7da6 100644 --- a/src/v/cluster/metadata_cache.cc +++ b/src/v/cluster/metadata_cache.cc @@ -154,7 +154,7 @@ bool metadata_cache::should_reject_writes() const { bool metadata_cache::should_reject_reads(model::topic_namespace_view tp) const { return _migrated_resources.local().get_topic_state(tp) - >= data_migrations::migrated_resource_state::fully_blocked; + >= data_migrations::migrated_resource_state::create_only; } bool metadata_cache::should_reject_writes( diff --git a/src/v/cluster/topics_frontend.cc b/src/v/cluster/topics_frontend.cc index e7d188bb30398..effaaaa39f2ec 100644 --- a/src/v/cluster/topics_frontend.cc +++ b/src/v/cluster/topics_frontend.cc @@ -430,24 +430,29 @@ errc topics_frontend::validate_topic_configuration( ss::future topics_frontend::do_create_topic( custom_assignable_topic_configuration assignable_config, model::timeout_clock::time_point timeout) { - if (_topics.local().contains(assignable_config.cfg.tp_ns)) { + auto& tp_ns = assignable_config.cfg.tp_ns; + if (_topics.local().contains(tp_ns)) { vlog( clusterlog.trace, "unable to create topic {} as it already exists", - assignable_config.cfg.tp_ns); - co_return topic_result( - assignable_config.cfg.tp_ns, errc::topic_already_exists); + tp_ns); + co_return topic_result(tp_ns, errc::topic_already_exists); } - if ( - !assignable_config.cfg.is_migrated - && _migrated_resources.is_already_migrated(assignable_config.cfg.tp_ns)) { + bool blocked = assignable_config.cfg.is_migrated + ? _migrated_resources.get_topic_state(tp_ns) + > data_migrations::migrated_resource_state::create_only + : _migrated_resources.is_already_migrated(tp_ns); + if (blocked) { vlog( clusterlog.warn, - "unable to create topic {} as it is being migrated", - assignable_config.cfg.tp_ns); + "unable to create topic {} as it is being migrated: " + "cfg.is_migrated={}, migrated resource state is {}", + assignable_config.cfg.tp_ns, + assignable_config.cfg.is_migrated, + _migrated_resources.get_topic_state(tp_ns)); co_return topic_result( - assignable_config.cfg.tp_ns, errc::topic_already_exists); + assignable_config.cfg.tp_ns, errc::resource_is_being_migrated); } auto validation_err = validate_topic_configuration(assignable_config); From a3652d8b6eee6dcfab14a03450d9e9024a89f80f Mon Sep 17 00:00:00 2001 From: alexey bashtanov Date: Mon, 16 Sep 2024 21:20:42 +0100 Subject: [PATCH 3/5] tests/migrations: fix typo --- tests/rptest/tests/data_migrations_api_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/rptest/tests/data_migrations_api_test.py b/tests/rptest/tests/data_migrations_api_test.py index 76be8b70f2494..6a34b966666e2 100644 --- a/tests/rptest/tests/data_migrations_api_test.py +++ b/tests/rptest/tests/data_migrations_api_test.py @@ -144,7 +144,7 @@ def migration_in_one_of_states(): timeout_sec=90, backoff_sec=1, err_msg= - f"Failed waiting for migration {id} to reach on of {states} states" + f"Failed waiting for migration {id} to reach one of {states} states" ) def wait_migration_appear(self, migration_id): From b9dc40ff1a7115638f58b9c28185d89bca9dfccc Mon Sep 17 00:00:00 2001 From: alexey bashtanov Date: Thu, 19 Sep 2024 15:22:30 +0100 Subject: [PATCH 4/5] c/migrations: allow no-op migration state updates Previously the assert was too strict --- src/v/cluster/data_migration_backend.cc | 37 ++++++++++++------------- src/v/cluster/data_migration_backend.h | 4 +-- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/src/v/cluster/data_migration_backend.cc b/src/v/cluster/data_migration_backend.cc index fcf60a9268bbe..7f6776add1536 100644 --- a/src/v/cluster/data_migration_backend.cc +++ b/src/v/cluster/data_migration_backend.cc @@ -849,14 +849,17 @@ ss::future<> backend::handle_migration_update(id id) { auto units = co_await _mutex.get_units(_as); vlog(dm_log.debug, "lock acquired for data migration {} notification", id); - bool need_wakeup = false; - auto new_maybe_metadata = _table.get_migration(id); auto new_state = new_maybe_metadata ? std::make_optional( new_maybe_metadata->get().state) : std::nullopt; vlog(dm_log.debug, "migration {} new state is {}", id, new_state); + work_scope new_scope; + if (new_maybe_metadata) { + new_scope = get_work_scope(new_maybe_metadata->get()); + } + // forget about the migration if it went forward or is gone auto old_it = std::as_const(_migration_states).find(id); if (old_it != _migration_states.cend()) { @@ -867,8 +870,9 @@ ss::future<> backend::handle_migration_update(id id) { id, old_mrstate.scope.sought_state); vassert( - !new_maybe_metadata || new_state >= old_mrstate.scope.sought_state, - "migration state went from seeking {} back to {}", + !new_scope.sought_state + || new_scope.sought_state >= old_mrstate.scope.sought_state, + "migration state went from seeking {} back seeking to seeking {}", old_mrstate.scope.sought_state, new_state); vlog(dm_log.debug, "dropping migration {} reconciliation state", id); @@ -881,24 +885,19 @@ ss::future<> backend::handle_migration_update(id id) { } } // create new state if needed - if (new_maybe_metadata) { - const auto& new_metadata = new_maybe_metadata->get(); - auto scope = get_work_scope(new_metadata); - if (scope.sought_state.has_value()) { - vlog( - dm_log.debug, "creating migration {} reconciliation state", id); - auto new_it = _migration_states.emplace_hint(old_it, id, scope); - if (scope.topic_work_needed || scope.partition_work_needed) { - co_await reconcile_migration(new_it->second, new_metadata); - } else { - // yes it is done as there is nothing to do - to_advance_if_done(new_it); - } - need_wakeup = true; + if (new_scope.sought_state) { + vlog(dm_log.debug, "creating migration {} reconciliation state", id); + auto new_it = _migration_states.emplace_hint(old_it, id, new_scope); + if (new_scope.topic_work_needed || new_scope.partition_work_needed) { + co_await reconcile_migration( + new_it->second, new_maybe_metadata->get()); + } else { + // yes it is done as there is nothing to do + to_advance_if_done(new_it); } } - if (_is_coordinator && need_wakeup) { + if (new_scope.sought_state && _is_coordinator) { wakeup(); } } diff --git a/src/v/cluster/data_migration_backend.h b/src/v/cluster/data_migration_backend.h index a0dbb1143401d..87bf49b8ca95e 100644 --- a/src/v/cluster/data_migration_backend.h +++ b/src/v/cluster/data_migration_backend.h @@ -50,8 +50,8 @@ class backend { private: struct work_scope { std::optional sought_state; - bool partition_work_needed; - bool topic_work_needed; + bool partition_work_needed = false; + bool topic_work_needed = false; }; struct topic_reconciliation_state { size_t idx_in_migration; From a8e05ed3faa2da39609c35014f24f52767e1405f Mon Sep 17 00:00:00 2001 From: alexey bashtanov Date: Fri, 20 Sep 2024 12:29:40 +0100 Subject: [PATCH 5/5] tests/migrations: do not recheck migration absense when finjector is around We already check it in self.wait_migration_disappear, and the recheck may use a node with late raft0. --- tests/rptest/tests/data_migrations_api_test.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/rptest/tests/data_migrations_api_test.py b/tests/rptest/tests/data_migrations_api_test.py index 6a34b966666e2..677469413f50b 100644 --- a/tests/rptest/tests/data_migrations_api_test.py +++ b/tests/rptest/tests/data_migrations_api_test.py @@ -331,8 +331,6 @@ def test_higher_level_migration_api(self): migrations_map = self.get_migrations_map() self.logger.info(f"migrations: {migrations_map}") - assert len( - migrations_map) == 0, "There should be no data migrations" # in inbound_topics = [ @@ -360,8 +358,6 @@ def test_higher_level_migration_api(self): ) migrations_map = self.get_migrations_map() self.logger.info(f"migrations: {migrations_map}") - assert len( - migrations_map) == 0, "There should be no data migrations" # todo: fix rp_storage_tool to use overridden topic names self.redpanda.si_settings.set_expected_damage(