Skip to content

Commit

Permalink
r/config_manager: converted start method to use coroutines
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Oct 4, 2023
1 parent 73abd0c commit d3416de
Showing 1 changed file with 31 additions and 44 deletions.
75 changes: 31 additions & 44 deletions src/v/raft/configuration_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -325,60 +325,47 @@ ss::future<>
configuration_manager::start(bool reset, model::revision_id initial_revision) {
_initial_revision = initial_revision;
if (reset) {
return _storage.kvs()
.remove(
storage::kvstore::key_space::consensus, configurations_map_key())
.then([this] {
return _storage.kvs().remove(
storage::kvstore::key_space::consensus,
highest_known_offset_key());
});
co_await _storage.kvs().remove(
storage::kvstore::key_space::consensus, configurations_map_key());

co_return co_await _storage.kvs().remove(
storage::kvstore::key_space::consensus, highest_known_offset_key());
}

auto map_buf = _storage.kvs().get(
storage::kvstore::key_space::consensus, configurations_map_key());
auto idx_buf = _storage.kvs().get(
storage::kvstore::key_space::consensus, next_configuration_idx_key());
return _lock.with([this,
map_buf = std::move(map_buf),
idx_buf = std::move(idx_buf)]() mutable {
auto f = ss::now();

if (map_buf) {
_next_index = configuration_idx(0);
if (idx_buf) {
_next_index = reflection::from_iobuf<configuration_idx>(
std::move(*idx_buf));
}
f = deserialize_configurations(_next_index, std::move(*map_buf))
.then([this](underlying_t cfgs) {
_configurations = std::move(cfgs);
if (!_configurations.empty()) {
_highest_known_offset
= _configurations.rbegin()->first;
_next_index = _configurations.rbegin()->second.idx
+ configuration_idx(1);
}
});
}
auto u = co_await _lock.get_units();

auto offset_buf = _storage.kvs().get(
storage::kvstore::key_space::consensus, highest_known_offset_key());
if (offset_buf) {
f = f.then([this, buf = std::move(*offset_buf)]() mutable {
auto offset = reflection::from_iobuf<model::offset>(
std::move(buf));
if (map_buf) {
_next_index = configuration_idx(0);
if (idx_buf) {
_next_index = reflection::from_iobuf<configuration_idx>(
std::move(*idx_buf));
}
_configurations = co_await deserialize_configurations(
_next_index, std::move(*map_buf));

_highest_known_offset = std::max(_highest_known_offset, offset);
});
if (!_configurations.empty()) {
_highest_known_offset = _configurations.rbegin()->first;
_next_index = _configurations.rbegin()->second.idx
+ configuration_idx(1);
}
}

return f.then([this] {
for (auto& [o, icfg] : _configurations) {
icfg.cfg.maybe_set_initial_revision(_initial_revision);
}
});
});
auto offset_buf = _storage.kvs().get(
storage::kvstore::key_space::consensus, highest_known_offset_key());
if (offset_buf) {
auto offset = reflection::from_iobuf<model::offset>(
std::move(*offset_buf));

_highest_known_offset = std::max(_highest_known_offset, offset);
}

for (auto& [o, icfg] : _configurations) {
icfg.cfg.maybe_set_initial_revision(_initial_revision);
}
}

void configuration_manager::maybe_store_highest_known_offset_in_background(
Expand Down

0 comments on commit d3416de

Please sign in to comment.