Skip to content

Commit

Permalink
Merge pull request redpanda-data#13586 from mmaslankaprv/raft-flush-t…
Browse files Browse the repository at this point in the history
…imer

Added size bytes background flush to Raft
  • Loading branch information
mmaslankaprv authored Oct 4, 2023
2 parents 5f6dffa + ce1ff99 commit 0459a51
Show file tree
Hide file tree
Showing 14 changed files with 302 additions and 66 deletions.
15 changes: 15 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,21 @@ configuration::configuration()
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
64,
{.min = 1, .max = 16384})
, raft_replica_max_pending_flush_bytes(
*this,
"raft_replica_max_pending_flush_bytes",
"Max not flushed bytes per partition. If configured threshold is reached "
"log will automatically be flushed even though it wasn't explicitly "
"requested",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
256_KiB)
, raft_flush_timer_interval_ms(
*this,
"raft_flush_timer_interval_ms",
"Interval of checking partition against the "
"`raft_replica_max_pending_flush_bytes`",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
100ms)
, enable_usage(
*this,
"enable_usage",
Expand Down
2 changes: 2 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ struct configuration final : public config_store {
bounded_property<size_t> raft_recovery_default_read_size;
property<bool> raft_enable_lw_heartbeat;
bounded_property<size_t> raft_recovery_concurrency_per_shard;
property<std::optional<size_t>> raft_replica_max_pending_flush_bytes;
property<std::chrono::milliseconds> raft_flush_timer_interval_ms;
// Kafka
property<bool> enable_usage;
bounded_property<size_t> usage_num_windows;
Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/append_entries_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ ss::future<> append_entries_buffer::do_flush(
}
}
if (needs_flush) {
f = _consensus.flush_log();
f = _consensus.flush_log().discard_result();
}
}

Expand Down
85 changes: 54 additions & 31 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1960,7 +1960,7 @@ consensus::do_append_entries(append_entries_request&& r) {
}
auto f = ss::now();
if (r.is_flush_required() && lstats.dirty_offset > _flushed_offset) {
f = flush_log();
f = flush_log().discard_result();
}
auto last_visible = std::min(
lstats.dirty_offset, request_metadata.last_visible_index);
Expand Down Expand Up @@ -2597,36 +2597,36 @@ append_entries_reply consensus::make_append_entries_reply(
return reply;
}

ss::future<> consensus::flush_log() {
if (!_has_pending_flushes) {
return ss::now();
ss::future<consensus::flushed> consensus::flush_log() {
if (!has_pending_flushes()) {
co_return flushed::no;
}
_probe->log_flushed();
_has_pending_flushes = false;
auto flushed_up_to = _log->offsets().dirty_offset;
return _log->flush().then([this, flushed_up_to] {
auto lstats = _log->offsets();
/**
* log flush may be interleaved with trucation, hence we need to check
* if log was truncated, if so we do nothing, flushed offset will be
* updated in the truncation path.
*/
if (flushed_up_to > lstats.dirty_offset) {
return;
}
_probe->log_flushed();
_not_flushed_bytes = 0;
co_await _log->flush();
const auto lstats = _log->offsets();
/**
* log flush may be interleaved with trucation, hence we need to check
* if log was truncated, if so we do nothing, flushed offset will be
* updated in the truncation path.
*/
if (flushed_up_to > lstats.dirty_offset) {
co_return flushed::yes;
}

_flushed_offset = std::max(flushed_up_to, _flushed_offset);
vlog(_ctxlog.trace, "flushed offset updated: {}", _flushed_offset);
// TODO: remove this assertion when we will remove committed_offset
// from storage.
vassert(
lstats.committed_offset >= _flushed_offset,
"Raft incorrectly tracking flushed log offset. Expected offset: {}, "
" current log offsets: {}, log: {}",
_flushed_offset,
lstats,
_log);
});
_flushed_offset = std::max(flushed_up_to, _flushed_offset);
vlog(_ctxlog.trace, "flushed offset updated: {}", _flushed_offset);
// TODO: remove this assertion when we will remove committed_offset
// from storage.
vassert(
lstats.committed_offset >= _flushed_offset,
"Raft incorrectly tracking flushed log offset. Expected offset: {}, "
" current log offsets: {}, log: {}",
_flushed_offset,
lstats,
_log);
co_return flushed::yes;
}

ss::future<storage::append_result> consensus::disk_append(
Expand Down Expand Up @@ -2678,7 +2678,7 @@ ss::future<storage::append_result> consensus::disk_append(
*/
_last_quorum_replicated_index = ret.last_offset;
}
_has_pending_flushes = true;
_not_flushed_bytes += ret.byte_size;
// TODO
// if we rolled a log segment. write current configuration
// for speedy recovery in the background
Expand Down Expand Up @@ -2783,8 +2783,8 @@ ss::future<> consensus::refresh_commit_index() {
return _op_lock.get_units()
.then([this](ssx::semaphore_units u) mutable {
auto f = ss::now();
if (_has_pending_flushes) {
f = flush_log();
if (has_pending_flushes()) {
f = flush_log().discard_result();
}

if (!is_elected_leader()) {
Expand Down Expand Up @@ -3832,4 +3832,27 @@ void consensus::upsert_recovery_state(
}
}

ss::future<> consensus::maybe_flush_log(size_t threshold_bytes) {
// if there is nothing to do exit without grabbing an op_lock, this check is
// sloppy as we data can be in flight but it is ok since next check will
// detect it and flush log.
if (_not_flushed_bytes < threshold_bytes) {
co_return;
}
try {
auto holder = _bg.hold();
auto u = co_await _op_lock.get_units();
auto flushed = co_await flush_log();
if (flushed && is_leader()) {
for (auto& [id, idx] : _fstats) {
// force full heartbeat to move the committed index forward
idx.last_sent_protocol_meta.reset();
}
}
} catch (const ss::gate_closed_exception&) {
} catch (const ss::broken_semaphore&) {
// ignore exception, group is shutting down.
}
}

} // namespace raft
12 changes: 9 additions & 3 deletions src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ class consensus {
return _received_snapshot_index;
}
size_t received_snapshot_bytes() const { return _received_snapshot_bytes; }
bool has_pending_flushes() const { return _has_pending_flushes; }
bool has_pending_flushes() const { return _not_flushed_bytes > 0; }

model::offset start_offset() const {
return model::next_offset(_last_snapshot_index);
Expand Down Expand Up @@ -503,6 +503,11 @@ class consensus {
get_follower_recovery_state() const {
return _follower_recovery_state;
}
/**
* Flushes underlying log only if there are more not flushed bytes than the
* requested threshold.
*/
ss::future<> maybe_flush_log(size_t threshold_bytes);

private:
friend replicate_entries_stm;
Expand Down Expand Up @@ -598,7 +603,8 @@ class consensus {
void trigger_leadership_notification();

/// \brief _does not_ hold the lock.
ss::future<> flush_log();
using flushed = ss::bool_class<struct flushed_executed_tag>;
ss::future<flushed> flush_log();

void maybe_step_down();

Expand Down Expand Up @@ -794,7 +800,7 @@ class consensus {
follower_stats _fstats;

replicate_batcher _batcher;
bool _has_pending_flushes{false};
size_t _not_flushed_bytes{0};

/// used to wait for background ops before shutting down
ss::gate _bg;
Expand Down
78 changes: 58 additions & 20 deletions src/v/raft/group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,30 @@ group_manager::group_manager(
_configuration.recovery_concurrency_per_shard,
_configuration.heartbeat_interval)
, _feature_table(feature_table.local())
, _flush_timer_jitter(_configuration.flush_timer_interval_ms)
, _is_ready(false) {
_flush_timer.set_callback([this] {
ssx::spawn_with_gate(_gate, [this] {
return flush_groups().finally([this] {
if (_gate.is_closed()) {
return;
}
_flush_timer.arm(_flush_timer_jitter());
});
});
});
setup_metrics();
}

ss::future<> group_manager::start() {
co_await _heartbeats.start();
co_await _recovery_scheduler.start();
_flush_timer.arm(_flush_timer_jitter());
}

ss::future<> group_manager::stop() {
auto f = _gate.close();
_flush_timer.cancel();

f = f.then([this] { return _recovery_scheduler.stop(); });

Expand Down Expand Up @@ -119,11 +132,12 @@ ss::future<ss::lw_shared_ptr<raft::consensus>> group_manager::create_group(
_feature_table,
_is_ready ? std::nullopt : std::make_optional(min_voter_priority),
keep_snapshotted_log);

return ss::with_gate(_gate, [this, raft] {
return _heartbeats.register_group(raft).then([this, raft] {
_groups.push_back(raft);
return raft;
return _groups_mutex.with([this, raft = std::move(raft)] {
return ss::with_gate(_gate, [this, raft] {
return _heartbeats.register_group(raft).then([this, raft] {
_groups.push_back(raft);
return raft;
});
});
});
}
Expand Down Expand Up @@ -158,24 +172,30 @@ raft::group_configuration group_manager::create_initial_configuration(
}

ss::future<> group_manager::remove(ss::lw_shared_ptr<raft::consensus> c) {
return c->stop()
.then([c] { return c->remove_persistent_state(); })
.then(
[this, id = c->group()] { return _heartbeats.deregister_group(id); })
.finally([this, c] {
_groups.erase(
std::remove(_groups.begin(), _groups.end(), c), _groups.end());
});
return _groups_mutex.with([this, c = std::move(c)] {
return c->stop()
.then([c] { return c->remove_persistent_state(); })
.then([this, id = c->group()] {
return _heartbeats.deregister_group(id);
})
.finally([this, c] {
_groups.erase(
std::remove(_groups.begin(), _groups.end(), c), _groups.end());
});
});
}

ss::future<> group_manager::shutdown(ss::lw_shared_ptr<raft::consensus> c) {
return c->stop()
.then(
[this, id = c->group()] { return _heartbeats.deregister_group(id); })
.finally([this, c] {
_groups.erase(
std::remove(_groups.begin(), _groups.end(), c), _groups.end());
});
return _groups_mutex.with([this, c = std::move(c)] {
return c->stop()
.then([this, id = c->group()] {
return _heartbeats.deregister_group(id);
})
.finally([this, c] {
_groups.erase(
std::remove(_groups.begin(), _groups.end(), c), _groups.end());
});
});
}

void group_manager::trigger_leadership_notification(
Expand Down Expand Up @@ -203,4 +223,22 @@ void group_manager::setup_metrics() {
sm::description("Number of raft groups"))});
}

ss::future<> group_manager::flush_groups() {
/**
* Assumes that gate is already held
*/
auto u = co_await _groups_mutex.get_units();

co_await ss::max_concurrent_for_each(
_groups.begin(),
_groups.end(),
32,
[this](const ss::lw_shared_ptr<consensus>& raft) {
if (_configuration.replica_max_not_flushed_bytes()) {
return raft->maybe_flush_log(
_configuration.replica_max_not_flushed_bytes().value());
}
return ss::now();
});
}
} // namespace raft
10 changes: 9 additions & 1 deletion src/v/raft/group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "raft/heartbeat_manager.h"
#include "raft/recovery_memory_quota.h"
#include "raft/recovery_scheduler.h"
#include "raft/timeout_jitter.h"
#include "raft/types.h"
#include "rpc/fwd.h"
#include "ssx/metrics.h"
Expand Down Expand Up @@ -46,6 +47,8 @@ class group_manager {
config::binding<bool> enable_lw_heartbeat;
config::binding<size_t> recovery_concurrency_per_shard;
config::binding<std::chrono::milliseconds> election_timeout_ms;
config::binding<std::optional<size_t>> replica_max_not_flushed_bytes;
config::binding<std::chrono::milliseconds> flush_timer_interval_ms;
};
using config_provider_fn = ss::noncopyable_function<configuration()>;

Expand Down Expand Up @@ -96,9 +99,11 @@ class group_manager {
void trigger_leadership_notification(raft::leadership_status);
void setup_metrics();

ss::future<> flush_groups();

raft::group_configuration create_initial_configuration(
std::vector<model::broker>, model::revision_id) const;

mutex _groups_mutex;
model::node_id _self;
ss::scheduling_group _raft_sg;
raft::consensus_client_protocol _client;
Expand All @@ -115,6 +120,9 @@ class group_manager {
recovery_memory_quota _recovery_mem_quota;
recovery_scheduler _recovery_scheduler;
features::feature_table& _feature_table;
ss::timer<clock_type> _flush_timer;
timeout_jitter _flush_timer_jitter;

bool _is_ready;
};

Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/replicate_entries_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ ss::future<result<append_entries_reply>> replicate_entries_stm::flush_log() {
using ret_t = result<append_entries_reply>;
auto flush_f = ss::now();
if (_is_flush_required) {
flush_f = _ptr->flush_log();
flush_f = _ptr->flush_log().discard_result();
}

auto f = flush_f
Expand Down
Loading

0 comments on commit 0459a51

Please sign in to comment.