Skip to content

Commit

Permalink
Merge pull request redpanda-data#23635 from redpanda-data/migrations-…
Browse files Browse the repository at this point in the history
…fix-foreign-memory-access

c/migrations: convert migrations table to an ssx::single_sharded
  • Loading branch information
mmaslankaprv authored Oct 17, 2024
2 parents ceae89a + 8339db6 commit 0c89e0c
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 20 deletions.
21 changes: 11 additions & 10 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,13 @@ ss::future<> controller::wire_up() {
.then([this] { return _roles.start(); })
.then([this] { return _data_migrated_resources.start(); })
.then([this] {
_data_migration_table
= std::make_unique<data_migrations::migrations_table>(
_data_migrated_resources,
std::ref(_tp_state),
config::shard_local_cfg().cloud_storage_enabled()
&& config::shard_local_cfg()
.cloud_storage_disable_archiver_manager());
return _data_migration_table.start_on(
data_migrations::data_migrations_shard,
std::ref(_data_migrated_resources),
std::ref(_tp_state),
config::shard_local_cfg().cloud_storage_enabled()
&& config::shard_local_cfg()
.cloud_storage_disable_archiver_manager());
})
.then([this] {
return _authorizer.start(
Expand Down Expand Up @@ -308,7 +308,7 @@ ss::future<> controller::start(
co_await _data_migration_frontend.start(
_raft0->self().id(),
_cloud_storage_api.local_is_initialized(),
std::ref(*_data_migration_table),
std::ref(_data_migration_table),
std::ref(_feature_table),
std::ref(_stm),
std::ref(_partition_leaders),
Expand Down Expand Up @@ -364,7 +364,7 @@ ss::future<> controller::start(
std::ref(_plugin_backend),
std::ref(_recovery_manager),
std::ref(_quota_backend),
std::ref(*_data_migration_table));
std::ref(_data_migration_table.local()));
}

co_await _members_frontend.start(
Expand Down Expand Up @@ -776,7 +776,7 @@ ss::future<> controller::start(

co_await _data_migration_backend.start_on(
data_migrations::data_migrations_shard,
std::ref(*_data_migration_table),
std::ref(_data_migration_table.local()),
std::ref(_data_migration_frontend.local()),
std::ref(_data_migration_worker),
std::ref(_partition_leaders.local()),
Expand Down Expand Up @@ -866,6 +866,7 @@ ss::future<> controller::stop() {
co_await _oidc_service.stop();
co_await _authorizer.stop();
co_await _ephemeral_credentials.stop();
co_await _data_migration_table.stop();
co_await _data_migrated_resources.stop();
co_await _roles.stop();
co_await _credentials.stop();
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ class controller {
ss::sharded<partition_balancer_state>
_partition_balancer_state; // single instance
ss::sharded<data_migrations::migrated_resources> _data_migrated_resources;
std::unique_ptr<data_migrations::migrations_table> _data_migration_table;
ssx::single_sharded<data_migrations::migrations_table>
_data_migration_table;
ss::sharded<partition_leaders_table>
_partition_leaders; // instance per core
ss::sharded<shard_placement_table> _shard_placement; // instance per core
Expand Down
17 changes: 10 additions & 7 deletions src/v/cluster/data_migration_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "partition_leaders_table.h"
#include "rpc/connection_cache.h"
#include "ssx/future-util.h"
#include "ssx/single_sharded.h"

#include <fmt/ostream.h>

Expand All @@ -33,7 +34,7 @@ namespace cluster::data_migrations {
frontend::frontend(
model::node_id self,
bool cloud_storage_api_initialized,
migrations_table& table,
ssx::single_sharded<migrations_table>& table,
ss::sharded<features::feature_table>& features,
ss::sharded<controller_stm>& stm,
ss::sharded<partition_leaders_table>& leaders,
Expand Down Expand Up @@ -259,7 +260,8 @@ ss::future<result<id>> frontend::do_create_migration(data_migration migration) {
co_return make_error_code(errc::data_migration_invalid_resources);
}

auto v_err = _table.validate_migrated_resources(migration);
auto v_err = _table.local().validate_migrated_resources(migration);

if (v_err) {
vlog(
dm_log.warn,
Expand All @@ -269,7 +271,7 @@ ss::future<result<id>> frontend::do_create_migration(data_migration migration) {
co_return make_error_code(errc::data_migration_invalid_resources);
}

auto id = _table.get_next_id();
auto id = _table.local().get_next_id();
ec = co_await replicate_and_wait(
_controller,
_as,
Expand All @@ -285,15 +287,16 @@ ss::future<result<id>> frontend::do_create_migration(data_migration migration) {

ss::future<chunked_vector<migration_metadata>> frontend::list_migrations() {
return container().invoke_on(data_migrations_shard, [](frontend& local) {
return local._table.list_migrations();
return local._table.local().list_migrations();
});
}

ss::future<result<migration_metadata>>
frontend::get_migration(id migration_id) {
return container().invoke_on(
data_migrations_shard, [migration_id](frontend& local) {
auto maybe_migration = local._table.get_migration(migration_id);
auto maybe_migration = local._table.local().get_migration(
migration_id);
return maybe_migration
? result<migration_metadata>(maybe_migration->get().copy())
: errc::data_migration_not_exists;
Expand Down Expand Up @@ -335,7 +338,7 @@ frontend::do_update_migration_state(id id, state state) {
/**
* preliminary validation of migration state transition
*/
auto migration = _table.get_migration(id);
auto migration = _table.local().get_migration(id);
if (!migration) {
vlog(dm_log.warn, "migration {} id not found", id);
co_return errc::data_migration_not_exists;
Expand Down Expand Up @@ -383,7 +386,7 @@ ss::future<std::error_code> frontend::do_remove_migration(id id) {
/**
* preliminary validation of migration existence
*/
auto migration = _table.get_migration(id);
auto migration = _table.local().get_migration(id);
if (!migration) {
vlog(dm_log.warn, "migration {} id not found", id);
co_return errc::data_migration_not_exists;
Expand Down
5 changes: 3 additions & 2 deletions src/v/cluster/data_migration_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "cluster/fwd.h"
#include "features/fwd.h"
#include "rpc/fwd.h"
#include "ssx/single_sharded.h"

#include <seastar/core/sharded.hh>

Expand All @@ -27,7 +28,7 @@ class frontend : public ss::peering_sharded_service<frontend> {
frontend(
model::node_id,
bool,
migrations_table&,
ssx::single_sharded<migrations_table>&,
ss::sharded<features::feature_table>&,
ss::sharded<controller_stm>&,
ss::sharded<partition_leaders_table>&,
Expand Down Expand Up @@ -85,7 +86,7 @@ class frontend : public ss::peering_sharded_service<frontend> {
private:
model::node_id _self;
bool _cloud_storage_api_initialized;
migrations_table& _table;
ssx::single_sharded<migrations_table>& _table;
ss::sharded<features::feature_table>& _features;
ss::sharded<controller_stm>& _controller;
ss::sharded<partition_leaders_table>& _leaders_table;
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/data_migration_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ migrations_table::migrations_table(
, _topics(topics)
, _enabled(enabled) {}

ss::future<> migrations_table::stop() { return ss::now(); }

bool migrations_table::is_valid_state_transition(state current, state target) {
switch (current) {
case state::planned:
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/data_migration_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class migrations_table {
ss::sharded<topic_table>& topics,
bool enabled);

ss::future<> stop();

using notification_id = named_type<uint64_t, struct notification_id_tag>;
using notification_callback = ss::noncopyable_function<void(id)>;

Expand Down

0 comments on commit 0c89e0c

Please sign in to comment.