Skip to content

Commit

Permalink
cluster/config_manager: fix double apply of configs
Browse files Browse the repository at this point in the history
previously, to support validation, the code would call a method and then
call it again with a smp::invoke_on_all.

this pr preserve this behavior but uses invoke_on_others to eliminate
the double-apply on the current shard.

to support this, the lambdas needs to be no_except_copy/move, so the
parameters are taken by pointer, since they live in the parent scope
  • Loading branch information
andijcr committed Oct 2, 2023
1 parent 862e889 commit e8d6bd3
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 13 deletions.
22 changes: 13 additions & 9 deletions src/v/cluster/config_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,9 @@ config_manager::preload_join(const controller_join_snapshot& snap) {
preload_local(key, value, std::ref(result));

// Broadcast value to all shards
co_await ss::smp::invoke_on_all(
[&key, &value]() { preload_local(key, value, std::nullopt); });
co_await ss::smp::invoke_on_others([key = &key, value = &value]() {
preload_local(*key, *value, std::nullopt);
});
}

co_return result;
Expand Down Expand Up @@ -452,15 +453,16 @@ ss::future<bool> config_manager::load_bootstrap() {
i.second);
}

co_await ss::smp::invoke_on_all(
[&config] { config::shard_local_cfg().read_yaml(config, {}); });
co_await ss::smp::invoke_on_others(
[config = &config] { config::shard_local_cfg().read_yaml(*config, {}); });

co_return true;
}

ss::future<> config_manager::load_legacy(YAML::Node const& legacy_config) {
co_await ss::smp::invoke_on_all(
[&legacy_config] { config::shard_local_cfg().load(legacy_config); });
co_await ss::smp::invoke_on_others([legacy_config = &legacy_config] {
config::shard_local_cfg().load(*legacy_config);
});

// This node has never seen a cluster configuration message.
// Bootstrap configuration from local yaml file.
Expand Down Expand Up @@ -521,8 +523,9 @@ ss::future<config_manager::preload_result> config_manager::load_cache() {
}

// Broadcast value to all shards
co_await ss::smp::invoke_on_all(
[&key, &value]() { preload_local(key, value, std::nullopt); });
co_await ss::smp::invoke_on_others([key = &key, value = &value]() {
preload_local(*key, *value, std::nullopt);
});
}

co_return result;
Expand Down Expand Up @@ -882,7 +885,8 @@ config_manager::apply_delta(cluster_config_delta_cmd&& cmd_in) {
// errors, so only need to check the errors on one)
auto apply_r = apply_local(data, false);

co_await ss::smp::invoke_on_all([&data] { apply_local(data, true); });
co_await ss::smp::invoke_on_others(
[data = &data] { apply_local(*data, true); });

// Merge results from this delta into our status.
my_latest_status.version = delta_version;
Expand Down
8 changes: 4 additions & 4 deletions src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -661,11 +661,11 @@ void application::hydrate_config(const po::variables_map& cfg) {
}
};

ss::smp::invoke_on_all([&config, cfg_path] {
config::node().load(cfg_path, config);
ss::smp::invoke_on_others([config = &config, cfg_path = &cfg_path] {
config::node().load(*cfg_path, *config);
}).get0();

auto node_config_errors = config::node().load(config);
auto node_config_errors = config::node().load(cfg_path, config);
for (const auto& i : node_config_errors) {
vlog(
_log.warn,
Expand Down Expand Up @@ -2198,7 +2198,7 @@ void application::start_runtime_services(

// FIXME: in first patch explain why this is started after the
// controller so the broker set will be available. Then next patch fix.
syschecks::systemd_message("Starting metadata dissination service").get();
syschecks::systemd_message("Starting metadata dissemination service").get();
md_dissemination_service
.invoke_on_all(&cluster::metadata_dissemination_service::start)
.get();
Expand Down

0 comments on commit e8d6bd3

Please sign in to comment.