Skip to content

Commit

Permalink
Merge pull request redpanda-data#23335 from bashtanov/migrations-race…
Browse files Browse the repository at this point in the history
…-conditions-fixes

Migrations race conditions fixes
  • Loading branch information
bashtanov authored Sep 23, 2024
2 parents f0626da + a8e05ed commit f253cda
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 44 deletions.
1 change: 1 addition & 0 deletions src/v/cluster/data_migrated_resources.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ migrated_resource_state get_resource_state<inbound_migration>(state state) {
case state::planned:
return migrated_resource_state::metadata_locked;
case state::preparing:
return migrated_resource_state::create_only;
case state::prepared:
case state::canceling:
case state::executing:
Expand Down
37 changes: 18 additions & 19 deletions src/v/cluster/data_migration_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -849,14 +849,17 @@ ss::future<> backend::handle_migration_update(id id) {
auto units = co_await _mutex.get_units(_as);
vlog(dm_log.debug, "lock acquired for data migration {} notification", id);

bool need_wakeup = false;

auto new_maybe_metadata = _table.get_migration(id);
auto new_state = new_maybe_metadata ? std::make_optional<state>(
new_maybe_metadata->get().state)
: std::nullopt;
vlog(dm_log.debug, "migration {} new state is {}", id, new_state);

work_scope new_scope;
if (new_maybe_metadata) {
new_scope = get_work_scope(new_maybe_metadata->get());
}

// forget about the migration if it went forward or is gone
auto old_it = std::as_const(_migration_states).find(id);
if (old_it != _migration_states.cend()) {
Expand All @@ -867,8 +870,9 @@ ss::future<> backend::handle_migration_update(id id) {
id,
old_mrstate.scope.sought_state);
vassert(
!new_maybe_metadata || new_state >= old_mrstate.scope.sought_state,
"migration state went from seeking {} back to {}",
!new_scope.sought_state
|| new_scope.sought_state >= old_mrstate.scope.sought_state,
"migration state went from seeking {} back seeking to seeking {}",
old_mrstate.scope.sought_state,
new_state);
vlog(dm_log.debug, "dropping migration {} reconciliation state", id);
Expand All @@ -881,24 +885,19 @@ ss::future<> backend::handle_migration_update(id id) {
}
}
// create new state if needed
if (new_maybe_metadata) {
const auto& new_metadata = new_maybe_metadata->get();
auto scope = get_work_scope(new_metadata);
if (scope.sought_state.has_value()) {
vlog(
dm_log.debug, "creating migration {} reconciliation state", id);
auto new_it = _migration_states.emplace_hint(old_it, id, scope);
if (scope.topic_work_needed || scope.partition_work_needed) {
co_await reconcile_migration(new_it->second, new_metadata);
} else {
// yes it is done as there is nothing to do
to_advance_if_done(new_it);
}
need_wakeup = true;
if (new_scope.sought_state) {
vlog(dm_log.debug, "creating migration {} reconciliation state", id);
auto new_it = _migration_states.emplace_hint(old_it, id, new_scope);
if (new_scope.topic_work_needed || new_scope.partition_work_needed) {
co_await reconcile_migration(
new_it->second, new_maybe_metadata->get());
} else {
// yes it is done as there is nothing to do
to_advance_if_done(new_it);
}
}

if (_is_coordinator && need_wakeup) {
if (new_scope.sought_state && _is_coordinator) {
wakeup();
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/data_migration_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ class backend {
private:
struct work_scope {
std::optional<state> sought_state;
bool partition_work_needed;
bool topic_work_needed;
bool partition_work_needed = false;
bool topic_work_needed = false;
};
struct topic_reconciliation_state {
size_t idx_in_migration;
Expand Down
27 changes: 20 additions & 7 deletions src/v/cluster/data_migration_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,20 @@ migrations_table::fill_snapshot(controller_snapshot& snapshot) const {
ss::future<> migrations_table::apply_snapshot(
model::offset, const controller_snapshot& snapshot) {
_next_id = snapshot.data_migrations.next_id;
_migrations.reserve(snapshot.data_migrations.migrations.size());

auto snapshot_size = snapshot.data_migrations.migrations.size();
_migrations.reserve(snapshot_size);

std::vector<id> affected_ids;
affected_ids.reserve(snapshot_size);

std::vector<migration_metadata> deleted;
auto it = _migrations.cbegin();
while (it != _migrations.cend()) {
auto prev = it++;
if (!snapshot.data_migrations.migrations.contains(prev->first)) {
auto extracted = _migrations.extract(prev);
_callbacks.notify(extracted.key());
affected_ids.push_back(extracted.key());
deleted.push_back(std::move(extracted.mapped()));
}
}
Expand All @@ -109,14 +114,19 @@ ss::future<> migrations_table::apply_snapshot(
}
it->second.state = migration.state;
}
_callbacks.notify(id);
affected_ids.push_back(id);
updated.emplace_back(it->second);
}

// notify callbacks after resources, see comment in migrations_table::apply
co_await _resources.invoke_on_all(
[&deleted, &updated](migrated_resources& resources) {
resources.apply_snapshot(deleted, updated);
});

for (auto id : affected_ids) {
_callbacks.notify(id);
}
}

std::optional<std::reference_wrapper<const migration_metadata>>
Expand Down Expand Up @@ -176,12 +186,14 @@ migrations_table::apply(create_data_migration_cmd cmd) {
}
_last_applied = id;
_next_id = std::max(_next_id, _last_applied + data_migrations::id(1));
_callbacks.notify(id);
// update migrated resources
// It is vital to update resources before subscribers, backend in
// particular. When backend works on the migrated entities it relies on them
// being locked by resources.
co_await _resources.invoke_on_all(
[&meta = it->second](migrated_resources& resources) {
resources.apply_update(meta);
});
_callbacks.notify(id);

co_return errc::success;
}
Expand Down Expand Up @@ -279,11 +291,12 @@ migrations_table::apply(update_data_migration_state_cmd cmd) {
co_return errc::invalid_data_migration_state;
}
it->second.state = requested_state;
_callbacks.notify(id);
// notify callbacks after resources, see comment in migrations_table::apply
co_await _resources.invoke_on_all(
[&meta = it->second](migrated_resources& resources) {
resources.apply_update(meta);
});
_callbacks.notify(id);

co_return errc::success;
}
Expand All @@ -304,11 +317,11 @@ migrations_table::apply(remove_data_migration_cmd cmd) {
case state::planned: {
auto meta = std::move(it->second);
_migrations.erase(it);
_callbacks.notify(id);
co_await _resources.invoke_on_all(
[&meta](migrated_resources& resources) {
resources.remove_migration(meta);
});
_callbacks.notify(id);

co_return errc::success;
}
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/data_migration_types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ std::ostream& operator<<(std::ostream& o, migrated_resource_state state) {
return o << "restricted";
case migrated_resource_state::read_only:
return o << "read_only";
case migrated_resource_state::create_only:
return o << "create_only";
case migrated_resource_state::fully_blocked:
return o << "fully_blocked";
}
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/data_migration_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ enum class migrated_resource_state {
non_restricted,
metadata_locked,
read_only,
create_only, // can only be created, and only by migrations
fully_blocked
};

Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/metadata_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ bool metadata_cache::should_reject_writes() const {

bool metadata_cache::should_reject_reads(model::topic_namespace_view tp) const {
return _migrated_resources.local().get_topic_state(tp)
>= data_migrations::migrated_resource_state::fully_blocked;
>= data_migrations::migrated_resource_state::create_only;
}

bool metadata_cache::should_reject_writes(
Expand Down
25 changes: 15 additions & 10 deletions src/v/cluster/topics_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -430,24 +430,29 @@ errc topics_frontend::validate_topic_configuration(
ss::future<topic_result> topics_frontend::do_create_topic(
custom_assignable_topic_configuration assignable_config,
model::timeout_clock::time_point timeout) {
if (_topics.local().contains(assignable_config.cfg.tp_ns)) {
auto& tp_ns = assignable_config.cfg.tp_ns;
if (_topics.local().contains(tp_ns)) {
vlog(
clusterlog.trace,
"unable to create topic {} as it already exists",
assignable_config.cfg.tp_ns);
co_return topic_result(
assignable_config.cfg.tp_ns, errc::topic_already_exists);
tp_ns);
co_return topic_result(tp_ns, errc::topic_already_exists);
}

if (
!assignable_config.cfg.is_migrated
&& _migrated_resources.is_already_migrated(assignable_config.cfg.tp_ns)) {
bool blocked = assignable_config.cfg.is_migrated
? _migrated_resources.get_topic_state(tp_ns)
> data_migrations::migrated_resource_state::create_only
: _migrated_resources.is_already_migrated(tp_ns);
if (blocked) {
vlog(
clusterlog.warn,
"unable to create topic {} as it is being migrated",
assignable_config.cfg.tp_ns);
"unable to create topic {} as it is being migrated: "
"cfg.is_migrated={}, migrated resource state is {}",
assignable_config.cfg.tp_ns,
assignable_config.cfg.is_migrated,
_migrated_resources.get_topic_state(tp_ns));
co_return topic_result(
assignable_config.cfg.tp_ns, errc::topic_already_exists);
assignable_config.cfg.tp_ns, errc::resource_is_being_migrated);
}

auto validation_err = validate_topic_configuration(assignable_config);
Expand Down
6 changes: 1 addition & 5 deletions tests/rptest/tests/data_migrations_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def migration_in_one_of_states():
timeout_sec=90,
backoff_sec=1,
err_msg=
f"Failed waiting for migration {id} to reach on of {states} states"
f"Failed waiting for migration {id} to reach one of {states} states"
)

def wait_migration_appear(self, migration_id):
Expand Down Expand Up @@ -331,8 +331,6 @@ def test_higher_level_migration_api(self):

migrations_map = self.get_migrations_map()
self.logger.info(f"migrations: {migrations_map}")
assert len(
migrations_map) == 0, "There should be no data migrations"

# in
inbound_topics = [
Expand Down Expand Up @@ -360,8 +358,6 @@ def test_higher_level_migration_api(self):
)
migrations_map = self.get_migrations_map()
self.logger.info(f"migrations: {migrations_map}")
assert len(
migrations_map) == 0, "There should be no data migrations"

# todo: fix rp_storage_tool to use overridden topic names
self.redpanda.si_settings.set_expected_damage(
Expand Down

0 comments on commit f253cda

Please sign in to comment.