From b2677fe026a0977a3b681f796d6b27199e18c780 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Thu, 11 Apr 2024 14:57:35 -0700 Subject: [PATCH 1/7] raft/debug: add time_since_last_flush to debug state --- src/v/cluster/cluster_utils.cc | 1 + src/v/cluster/types.h | 6 ++++-- src/v/redpanda/admin/api-doc/debug.json | 4 ++++ src/v/redpanda/admin/debug.cc | 1 + 4 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/v/cluster/cluster_utils.cc b/src/v/cluster/cluster_utils.cc index 499222759e18..2f1ff3346926 100644 --- a/src/v/cluster/cluster_utils.cc +++ b/src/v/cluster/cluster_utils.cc @@ -319,6 +319,7 @@ partition_raft_state get_partition_raft_state(consensus_ptr ptr) { raft_state.write_caching_enabled = ptr->write_caching_enabled(); raft_state.flush_bytes = ptr->flush_bytes(); raft_state.flush_ms = ptr->flush_ms(); + raft_state.time_since_last_flush = ptr->time_since_last_flush(); raft_state.replication_monitor_state = fmt::format( "{}", ptr->get_replication_monitor()); diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index dba8b5eb7cd4..e8deafea4d1e 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -3975,7 +3975,7 @@ struct partition_stm_state struct partition_raft_state : serde::envelope< partition_raft_state, - serde::version<4>, + serde::version<5>, serde::compat_version<0>> { using rpc_adl_exempt = std::true_type; @@ -4001,6 +4001,7 @@ struct partition_raft_state size_t flush_bytes; std::chrono::milliseconds flush_ms; ss::sstring replication_monitor_state; + std::chrono::milliseconds time_since_last_flush; struct follower_state : serde::envelope< @@ -4093,7 +4094,8 @@ struct partition_raft_state write_caching_enabled, flush_bytes, flush_ms, - replication_monitor_state); + replication_monitor_state, + time_since_last_flush); } friend bool diff --git a/src/v/redpanda/admin/api-doc/debug.json b/src/v/redpanda/admin/api-doc/debug.json index 3a61b2dc88ee..ee6c28d8cd80 100644 --- a/src/v/redpanda/admin/api-doc/debug.json +++ b/src/v/redpanda/admin/api-doc/debug.json @@ -1044,6 +1044,10 @@ "type": "long", "description": "unflushed bytes size flush trigger, used for cached writes." }, + "time_since_last_flush": { + "type": "long", + "description": "milliseconds since the last flush attempt." + }, "followers": { "type": "array", "items": { diff --git a/src/v/redpanda/admin/debug.cc b/src/v/redpanda/admin/debug.cc index e7d5e7d68f4e..3cb9c3f9a163 100644 --- a/src/v/redpanda/admin/debug.cc +++ b/src/v/redpanda/admin/debug.cc @@ -70,6 +70,7 @@ void fill_raft_state( raft_state.write_caching_enabled = src.write_caching_enabled; raft_state.flush_bytes = src.flush_bytes; raft_state.flush_ms = src.flush_ms.count(); + raft_state.time_since_last_flush = src.time_since_last_flush / 1ms; raft_state.replication_monitor_state = src.replication_monitor_state; if (src.followers) { for (const auto& f : *src.followers) { From 94705444aa0a4ccd1543409d83cd91c5ba46810a Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Thu, 11 Apr 2024 18:07:18 -0700 Subject: [PATCH 2/7] consensus: update committed index in single participant case In a single participant raft group, commit index is never updated with lower ack levels. Currently commit index recomputation is only done on append entries response which doesn't happen with single replica + lower ack levels. (Test added in subsequent commit) --- src/v/raft/consensus.cc | 4 ++ src/v/raft/replicate_entries_stm.cc | 85 +++++++++-------------------- src/v/raft/replicate_entries_stm.h | 4 +- 3 files changed, 33 insertions(+), 60 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 9f56d1994582..4a8c0227bf4d 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -2691,6 +2691,10 @@ ss::future consensus::flush_log() { _flushed_offset = std::max(flushed_up_to, _flushed_offset); vlog(_ctxlog.trace, "flushed offset updated: {}", _flushed_offset); + + maybe_update_majority_replicated_index(); + maybe_update_leader_commit_idx(); + // TODO: remove this assertion when we will remove committed_offset // from storage. vassert( diff --git a/src/v/raft/replicate_entries_stm.cc b/src/v/raft/replicate_entries_stm.cc index c22638a00c78..9c08aa5da876 100644 --- a/src/v/raft/replicate_entries_stm.cc +++ b/src/v/raft/replicate_entries_stm.cc @@ -50,42 +50,13 @@ ss::future replicate_entries_stm::share_batches() { co_return std::move(readers.back()); } -ss::future> replicate_entries_stm::flush_log() { - using ret_t = result; +ss::future<> replicate_entries_stm::flush_log() { auto flush_f = ss::now(); if (_is_flush_required) { flush_f = _ptr->flush_log().discard_result(); } - - auto f = flush_f - .then([this]() { - /** - * Replicate STM _dirty_offset is set to the dirty offset of - * a log after successfull self append. After flush we are - * certain that data to at least `_dirty_offset` were - * flushed. Sampling offset again right before the flush - * isn't necessary since it will not influence result of - * replication process in current `replicate_entries_stm` - * instance. - */ - auto new_committed_offset = _dirty_offset; - append_entries_reply reply; - reply.node_id = _ptr->_self; - reply.target_node_id = _ptr->_self; - reply.group = _ptr->group(); - reply.term = _ptr->term(); - // we just flushed offsets are the same - reply.last_dirty_log_index = new_committed_offset; - reply.last_flushed_log_index = new_committed_offset; - reply.result = reply_result::success; - return ret_t(reply); - }) - .handle_exception( - []([[maybe_unused]] const std::exception_ptr& ex) { - return ret_t(errc::leader_flush_failed); - }); _dispatch_sem.signal(); - return f; + return flush_f; } clock_type::time_point replicate_entries_stm::append_entries_timeout() { @@ -148,38 +119,36 @@ ss::future<> replicate_entries_stm::dispatch_one(vnode id) { return ss::with_gate( _req_bg, [this, id]() mutable { - return dispatch_single_retry(id).then( - [this, id](result reply) { - raft::follower_req_seq seq{0}; - if (id != _ptr->self()) { - auto it = _followers_seq.find(id); - vassert( - it != _followers_seq.end(), - "Follower request sequence is required to exists " - "for each follower. No follower sequence found " - "for {}", - id); - seq = it->second; - } - - if (!reply) { - _ptr->get_probe().replicate_request_error(); - } - _ptr->process_append_entries_reply( - id.id(), reply, seq, _dirty_offset); - }); + return id == _ptr->self() ? flush_log() + : dispatch_remote_append_entries(id); }) .handle_exception_type([](const ss::gate_closed_exception&) {}); } -ss::future> -replicate_entries_stm::dispatch_single_retry(vnode id) { - if (id == _ptr->_self) { - return flush_log(); - } - return share_batches().then( - [this, id](model::record_batch_reader batches) mutable { +ss::future<> replicate_entries_stm::dispatch_remote_append_entries(vnode id) { + vassert( + id != _ptr->_self, + "Incorrect remote entries dispatch for local node: {}", + id); + return share_batches() + .then([this, id](model::record_batch_reader batches) mutable { return send_append_entries_request(id, std::move(batches)); + }) + .then([this, id](result reply) { + raft::follower_req_seq seq{0}; + auto it = _followers_seq.find(id); + vassert( + it != _followers_seq.end(), + "Follower request sequence is required to exists " + "for each follower. No follower sequence found " + "for {}", + id); + seq = it->second; + if (!reply) { + _ptr->get_probe().replicate_request_error(); + } + _ptr->process_append_entries_reply( + id.id(), reply, seq, _dirty_offset); }); } diff --git a/src/v/raft/replicate_entries_stm.h b/src/v/raft/replicate_entries_stm.h index 6c236c362994..a6a43abe395e 100644 --- a/src/v/raft/replicate_entries_stm.h +++ b/src/v/raft/replicate_entries_stm.h @@ -99,8 +99,8 @@ class replicate_entries_stm { ss::future share_batches(); ss::future<> dispatch_one(vnode); - ss::future> dispatch_single_retry(vnode); - ss::future> flush_log(); + ss::future<> dispatch_remote_append_entries(vnode); + ss::future<> flush_log(); ss::future> send_append_entries_request(vnode, model::record_batch_reader); From 0055d789fdf44d650de359985352e7eee38852f3 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Wed, 10 Apr 2024 12:47:26 -0700 Subject: [PATCH 3/7] write_caching/config: rename configuration to write_caching_default New configuration name is write_caching_default. Accepted values = {true, false, disabled} true = write caching is true by default for all topics false = write caching is false by default for all topics disabled = write caching is disabled for all topics even when property overrides are present. Switching to true/false to be consistent with other configurations --- src/go/rpk/pkg/cli/cluster/config/import.go | 7 +++ src/go/rpk/pkg/cli/redpanda/start.go | 4 +- src/go/rpk/pkg/cli/redpanda/start_test.go | 8 ++-- src/v/cluster/metadata_cache.cc | 2 +- .../tests/topic_properties_generator.h | 4 +- src/v/compat/cluster_generator.h | 4 +- src/v/config/configuration.cc | 12 ++--- src/v/config/configuration.h | 2 +- .../handlers/configs/config_response_utils.cc | 9 ++-- .../server/handlers/configs/config_utils.h | 7 +-- .../kafka/server/handlers/topics/validators.h | 2 +- src/v/kafka/server/tests/alter_config_test.cc | 32 ++++++------- src/v/model/metadata.h | 20 ++++---- src/v/model/model.cc | 10 ++-- src/v/raft/tests/raft_fixture.cc | 9 ++-- src/v/raft/tests/simple_raft_fixture.h | 2 +- src/v/redpanda/application.cc | 3 +- src/v/storage/ntp_config.h | 5 +- tests/rptest/tests/describe_topics_test.py | 2 +- tests/rptest/tests/simple_e2e_test.py | 4 +- .../rptest/tests/write_caching_fi_e2e_test.py | 2 +- tests/rptest/tests/write_caching_fi_test.py | 2 +- tests/rptest/tests/write_caching_test.py | 48 ++++++++++--------- 23 files changed, 109 insertions(+), 91 deletions(-) diff --git a/src/go/rpk/pkg/cli/cluster/config/import.go b/src/go/rpk/pkg/cli/cluster/config/import.go index 07ae94f2d94f..5e10994e0be6 100644 --- a/src/go/rpk/pkg/cli/cluster/config/import.go +++ b/src/go/rpk/pkg/cli/cluster/config/import.go @@ -122,6 +122,13 @@ func importConfig( if vInt, ok := v.(int); ok { v = float64(vInt) } + } else if meta.Type == "string" { + // Some boolean configurations are inherently strings + // in which case we type switch to string for correct comparison + // below. yaml loses this type information during export. + if vBool, ok := v.(bool); ok { + v = fmt.Sprintf("%v", vBool) + } } else if meta.Type == "array" && meta.Items.Type == "string" { switch vArray := v.(type) { case []interface{}: diff --git a/src/go/rpk/pkg/cli/redpanda/start.go b/src/go/rpk/pkg/cli/redpanda/start.go index 228a097645c4..feb5c5e435e8 100644 --- a/src/go/rpk/pkg/cli/redpanda/start.go +++ b/src/go/rpk/pkg/cli/redpanda/start.go @@ -961,7 +961,7 @@ func setContainerModeCfgFields(y *config.RedpandaYaml) { y.Redpanda.Other["fetch_reads_debounce_timeout"] = 10 y.Redpanda.Other["group_initial_rebalance_delay"] = 0 y.Redpanda.Other["log_segment_size_min"] = 1 - y.Redpanda.Other["write_caching"] = "on" + y.Redpanda.Other["write_caching_default"] = "true" } func getOrFindInstallDir(fs afero.Fs, installDir string) (string, error) { @@ -992,7 +992,7 @@ environments: * fetch_reads_debounce_timeout: 10 * group_initial_rebalance_delay: 0 * log_segment_size_min: 1 - * write_caching: on + * write_caching_default: true After redpanda starts you can modify the cluster properties using: rpk config set ` diff --git a/src/go/rpk/pkg/cli/redpanda/start_test.go b/src/go/rpk/pkg/cli/redpanda/start_test.go index d2215c26807c..2fec8ca1f31e 100644 --- a/src/go/rpk/pkg/cli/redpanda/start_test.go +++ b/src/go/rpk/pkg/cli/redpanda/start_test.go @@ -251,7 +251,7 @@ func TestStartCommand(t *testing.T) { "fetch_reads_debounce_timeout": 10, "group_initial_rebalance_delay": 0, "log_segment_size_min": 1, - "write_caching": "on", + "write_caching_default": "true", } expYAML, err := yaml.Marshal(c) require.NoError(st, err) @@ -1537,7 +1537,7 @@ func TestStartCommand(t *testing.T) { "fetch_reads_debounce_timeout": 10, "group_initial_rebalance_delay": 0, "log_segment_size_min": 1, - "write_caching": "on", + "write_caching_default": "true", } require.Equal(st, expectedClusterFields, y.Redpanda.Other) }, @@ -1590,7 +1590,7 @@ func TestStartCommand(t *testing.T) { "fetch_reads_debounce_timeout": 10, "group_initial_rebalance_delay": 0, "log_segment_size_min": 1, - "write_caching": "on", + "write_caching_default": "true", } require.Nil(st, y.Redpanda.ID) require.Equal(st, true, y.Redpanda.DeveloperMode) @@ -1634,7 +1634,7 @@ func TestStartCommand(t *testing.T) { "fetch_reads_debounce_timeout": 10, "group_initial_rebalance_delay": 0, "log_segment_size_min": 1, - "write_caching": "on", + "write_caching_default": "true", } require.Exactly(st, expectedClusterFields, y.Redpanda.Other) }, diff --git a/src/v/cluster/metadata_cache.cc b/src/v/cluster/metadata_cache.cc index 42e4b6d42826..08668c0a263a 100644 --- a/src/v/cluster/metadata_cache.cc +++ b/src/v/cluster/metadata_cache.cc @@ -362,7 +362,7 @@ metadata_cache::get_topic_write_caching_mode( return std::nullopt; } if ( - config::shard_local_cfg().write_caching() + config::shard_local_cfg().write_caching_default() == model::write_caching_mode::disabled) { return model::write_caching_mode::disabled; } diff --git a/src/v/cluster/tests/topic_properties_generator.h b/src/v/cluster/tests/topic_properties_generator.h index 9a0795cba132..7b0ab44c1490 100644 --- a/src/v/cluster/tests/topic_properties_generator.h +++ b/src/v/cluster/tests/topic_properties_generator.h @@ -65,8 +65,8 @@ inline cluster::topic_properties random_topic_properties() { properties.remote_delete = false; properties.write_caching = tests::random_optional([] { return random_generators::random_choice( - {model::write_caching_mode::on, - model::write_caching_mode::off, + {model::write_caching_mode::default_true, + model::write_caching_mode::default_false, model::write_caching_mode::disabled}); }); properties.flush_ms = tests::random_optional( diff --git a/src/v/compat/cluster_generator.h b/src/v/compat/cluster_generator.h index df368bc40c6c..ab6a8fddb247 100644 --- a/src/v/compat/cluster_generator.h +++ b/src/v/compat/cluster_generator.h @@ -642,8 +642,8 @@ struct instance_generator { [] { return model::vcluster_id(random_xid()); }), tests::random_optional([] { return random_generators::random_choice( - {model::write_caching_mode::on, - model::write_caching_mode::off, + {model::write_caching_mode::default_true, + model::write_caching_mode::default_false, model::write_caching_mode::disabled}); }), tests::random_optional([] { return tests::random_duration_ms(); }), diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 4fa14cff60d1..c966fe4a7050 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -968,20 +968,20 @@ configuration::configuration() "one follower", {.visibility = visibility::tunable}, 16) - , write_caching( + , write_caching_default( *this, - "write_caching", + "write_caching_default", "Cache batches until the segment appender chunk is full instead of " "flushing for every acks=all write. This is the global default " "for all topics and can be overriden at a topic scope with property " "write.caching. 'disabled' mode takes precedence over topic overrides " "and disables the feature altogether for the entire cluster.", {.needs_restart = needs_restart::no, - .example = "on", + .example = "true", .visibility = visibility::user}, - model::write_caching_mode::off, - {model::write_caching_mode::on, - model::write_caching_mode::off, + model::write_caching_mode::default_false, + {model::write_caching_mode::default_true, + model::write_caching_mode::default_false, model::write_caching_mode::disabled}) , reclaim_min_size( *this, diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 73260863689c..d232b546a042 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -201,7 +201,7 @@ struct configuration final : public config_store { property raft_recovery_throttle_disable_dynamic_mode; property> raft_smp_max_non_local_requests; property raft_max_concurrent_append_requests_per_follower; - enum_property write_caching; + enum_property write_caching_default; property reclaim_min_size; property reclaim_max_size; diff --git a/src/v/kafka/server/handlers/configs/config_response_utils.cc b/src/v/kafka/server/handlers/configs/config_response_utils.cc index fc1cbaf703c3..9c653888c29d 100644 --- a/src/v/kafka/server/handlers/configs/config_response_utils.cc +++ b/src/v/kafka/server/handlers/configs/config_response_utils.cc @@ -595,16 +595,17 @@ config_response_container_t make_topic_configs( add_topic_config_if_requested( config_keys, result, - config::shard_local_cfg().write_caching.name(), - config::shard_local_cfg().write_caching(), + config::shard_local_cfg().write_caching_default.name(), + config::shard_local_cfg().write_caching_default(), topic_property_write_caching, - (config::shard_local_cfg().write_caching() + (config::shard_local_cfg().write_caching_default() == model::write_caching_mode::disabled) ? model::write_caching_mode::disabled : topic_properties.write_caching, include_synonyms, maybe_make_documentation( - include_documentation, config::shard_local_cfg().write_caching.desc()), + include_documentation, + config::shard_local_cfg().write_caching_default.desc()), &describe_as_string); add_topic_config_if_requested( diff --git a/src/v/kafka/server/handlers/configs/config_utils.h b/src/v/kafka/server/handlers/configs/config_utils.h index 65570ca8b222..f18c75833daa 100644 --- a/src/v/kafka/server/handlers/configs/config_utils.h +++ b/src/v/kafka/server/handlers/configs/config_utils.h @@ -371,10 +371,11 @@ struct write_caching_config_validator { "Invalid value {} for {}, accepted values: [{}, {}]", value, topic_property_write_caching, - model::write_caching_mode::on, - model::write_caching_mode::off); + model::write_caching_mode::default_true, + model::write_caching_mode::default_false); } - auto cluster_default = config::shard_local_cfg().write_caching(); + auto cluster_default + = config::shard_local_cfg().write_caching_default(); if (cluster_default == model::write_caching_mode::disabled) { return fmt::format("write caching disabled at cluster level"); } diff --git a/src/v/kafka/server/handlers/topics/validators.h b/src/v/kafka/server/handlers/topics/validators.h index 3e83761b2a36..a0cce4b0767a 100644 --- a/src/v/kafka/server/handlers/topics/validators.h +++ b/src/v/kafka/server/handlers/topics/validators.h @@ -297,7 +297,7 @@ struct write_caching_configs_validator { return mode != model::write_caching_mode::disabled; } // write caching cannot be turned on for internal topics. - return mode != model::write_caching_mode::on; + return mode != model::write_caching_mode::default_true; } static bool validate_flush_ms(const creatable_topic& c) { diff --git a/src/v/kafka/server/tests/alter_config_test.cc b/src/v/kafka/server/tests/alter_config_test.cc index 3f68349a219f..19a8bf310ea7 100644 --- a/src/v/kafka/server/tests/alter_config_test.cc +++ b/src/v/kafka/server/tests/alter_config_test.cc @@ -454,7 +454,7 @@ FIXTURE_TEST(test_alter_single_topic_config, alter_config_test_fixture) { properties.emplace("cleanup.policy", "compact"); properties.emplace("redpanda.remote.read", "true"); properties.emplace("replication.factor", "1"); - properties.emplace("write.caching", "on"); + properties.emplace("write.caching", "true"); properties.emplace("flush.ms", "225"); properties.emplace("flush.bytes", "32468"); @@ -485,14 +485,14 @@ FIXTURE_TEST(test_alter_multiple_topics_config, alter_config_test_fixture) { properties_1.emplace("retention.ms", "1234"); properties_1.emplace("cleanup.policy", "compact"); properties_1.emplace("replication.factor", "1"); - properties_1.emplace("write.caching", "on"); + properties_1.emplace("write.caching", "true"); properties_1.emplace("flush.ms", "225"); properties_1.emplace("flush.bytes", "32468"); absl::flat_hash_map properties_2; properties_2.emplace("retention.bytes", "4096"); properties_2.emplace("replication.factor", "1"); - properties_2.emplace("write.caching", "off"); + properties_2.emplace("write.caching", "false"); properties_2.emplace("flush.ms", "100"); properties_2.emplace("flush.bytes", "990"); @@ -514,13 +514,13 @@ FIXTURE_TEST(test_alter_multiple_topics_config, alter_config_test_fixture) { topic_1, "retention.ms", fmt::format("{}", 1234ms), describe_resp_1); assert_property_value( topic_1, "cleanup.policy", "compact", describe_resp_1); - assert_property_value(topic_1, "write.caching", "on", describe_resp_1); + assert_property_value(topic_1, "write.caching", "true", describe_resp_1); assert_property_value(topic_1, "flush.ms", "225", describe_resp_1); assert_property_value(topic_1, "flush.bytes", "32468", describe_resp_1); auto describe_resp_2 = describe_configs(topic_2); assert_property_value(topic_2, "retention.bytes", "4096", describe_resp_2); - assert_property_value(topic_2, "write.caching", "off", describe_resp_2); + assert_property_value(topic_2, "write.caching", "false", describe_resp_2); assert_property_value(topic_2, "flush.ms", "100", describe_resp_2); assert_property_value(topic_2, "flush.bytes", "990", describe_resp_2); } @@ -581,7 +581,7 @@ FIXTURE_TEST( absl::flat_hash_map properties; properties.emplace("retention.ms", "1234"); properties.emplace("replication.factor", "1"); - properties.emplace("write.caching", "on"); + properties.emplace("write.caching", "true"); properties.emplace("flush.ms", "225"); properties.emplace("flush.bytes", "32468"); @@ -596,7 +596,7 @@ FIXTURE_TEST( auto describe_resp = describe_configs(test_tp); assert_property_value( test_tp, "retention.ms", fmt::format("{}", 1234ms), describe_resp); - assert_property_value(test_tp, "write.caching", "on", describe_resp); + assert_property_value(test_tp, "write.caching", "true", describe_resp); assert_property_value(test_tp, "flush.ms", "225", describe_resp); assert_property_value(test_tp, "flush.bytes", "32468", describe_resp); @@ -606,7 +606,7 @@ FIXTURE_TEST( absl::flat_hash_map new_properties; new_properties.emplace("retention.bytes", "4096"); new_properties.emplace("replication.factor", "1"); - new_properties.emplace("write.caching", "off"); + new_properties.emplace("write.caching", "false"); new_properties.emplace("flush.ms", "9999"); new_properties.emplace("flush.bytes", "8888"); @@ -622,7 +622,7 @@ FIXTURE_TEST( new_describe_resp); assert_property_value( test_tp, "retention.bytes", "4096", new_describe_resp); - assert_property_value(test_tp, "write.caching", "off", new_describe_resp); + assert_property_value(test_tp, "write.caching", "false", new_describe_resp); assert_property_value(test_tp, "flush.ms", "9999", new_describe_resp); assert_property_value(test_tp, "flush.bytes", "8888", new_describe_resp); } @@ -641,7 +641,7 @@ FIXTURE_TEST(test_incremental_alter_config, alter_config_test_fixture) { std::make_pair("1234", kafka::config_resource_operation::set)); properties.emplace( "write.caching", - std::make_pair("on", kafka::config_resource_operation::set)); + std::make_pair("true", kafka::config_resource_operation::set)); properties.emplace( "flush.ms", std::make_pair("1234", kafka::config_resource_operation::set)); @@ -660,7 +660,7 @@ FIXTURE_TEST(test_incremental_alter_config, alter_config_test_fixture) { auto describe_resp = describe_configs(test_tp); assert_property_value( test_tp, "retention.ms", fmt::format("{}", 1234ms), describe_resp); - assert_property_value(test_tp, "write.caching", "on", describe_resp); + assert_property_value(test_tp, "write.caching", "true", describe_resp); assert_property_value(test_tp, "flush.ms", "1234", describe_resp); assert_property_value(test_tp, "flush.bytes", "5678", describe_resp); @@ -676,7 +676,7 @@ FIXTURE_TEST(test_incremental_alter_config, alter_config_test_fixture) { std::pair{"4096", kafka::config_resource_operation::set}); new_properties.emplace( "write.caching", - std::make_pair("off", kafka::config_resource_operation::set)); + std::make_pair("false", kafka::config_resource_operation::set)); incremental_alter_configs( {make_incremental_alter_topic_config_resource(test_tp, new_properties)}); @@ -687,7 +687,7 @@ FIXTURE_TEST(test_incremental_alter_config, alter_config_test_fixture) { test_tp, "retention.ms", fmt::format("{}", 1234ms), new_describe_resp); assert_property_value( test_tp, "retention.bytes", "4096", new_describe_resp); - assert_property_value(test_tp, "write.caching", "off", new_describe_resp); + assert_property_value(test_tp, "write.caching", "false", new_describe_resp); assert_property_value(test_tp, "flush.ms", "1234", new_describe_resp); assert_property_value(test_tp, "flush.bytes", "5678", new_describe_resp); } @@ -728,7 +728,7 @@ FIXTURE_TEST(test_incremental_alter_config_remove, alter_config_test_fixture) { std::make_pair("1234", kafka::config_resource_operation::set)); properties.emplace( "write.caching", - std::make_pair("on", kafka::config_resource_operation::set)); + std::make_pair("true", kafka::config_resource_operation::set)); properties.emplace( "flush.ms", std::make_pair("9999", kafka::config_resource_operation::set)); @@ -747,7 +747,7 @@ FIXTURE_TEST(test_incremental_alter_config_remove, alter_config_test_fixture) { auto describe_resp = describe_configs(test_tp); assert_property_value( test_tp, "retention.ms", fmt::format("{}", 1234ms), describe_resp); - assert_property_value(test_tp, "write.caching", "on", describe_resp); + assert_property_value(test_tp, "write.caching", "true", describe_resp); assert_property_value(test_tp, "flush.ms", "9999", describe_resp); assert_property_value(test_tp, "flush.bytes", "8888", describe_resp); @@ -789,7 +789,7 @@ FIXTURE_TEST(test_incremental_alter_config_remove, alter_config_test_fixture) { assert_property_value( test_tp, "write.caching", - fmt::format("{}", config::shard_local_cfg().write_caching()), + fmt::format("{}", config::shard_local_cfg().write_caching_default()), new_describe_resp); assert_property_value( test_tp, diff --git a/src/v/model/metadata.h b/src/v/model/metadata.h index 29cc49708535..bcd83b9b172a 100644 --- a/src/v/model/metadata.h +++ b/src/v/model/metadata.h @@ -555,21 +555,21 @@ using vcluster_id = named_type; * Type that represents the cluster wide write caching mode. */ enum class write_caching_mode : uint8_t { - // on by default for all topics - on = 0, - // off by default for all topics - off = 1, - // disabled by default clusterwide. - // cannot be overriden at topic level. + // true by default for all topics + default_true = 0, + // false by default for all topics + default_false = 1, + // disabled across all topics even for those + // with overrides. kill switch. disabled = 2 }; constexpr const char* write_caching_mode_to_string(write_caching_mode s) { switch (s) { - case write_caching_mode::on: - return "on"; - case write_caching_mode::off: - return "off"; + case write_caching_mode::default_true: + return "true"; + case write_caching_mode::default_false: + return "false"; case write_caching_mode::disabled: return "disabled"; default: diff --git a/src/v/model/model.cc b/src/v/model/model.cc index 43491d5cbc57..6890b127be3d 100644 --- a/src/v/model/model.cc +++ b/src/v/model/model.cc @@ -506,11 +506,13 @@ std::optional write_caching_mode_from_string(std::string_view s) { return string_switch>(s) .match( - model::write_caching_mode_to_string(model::write_caching_mode::on), - model::write_caching_mode::on) + model::write_caching_mode_to_string( + model::write_caching_mode::default_true), + model::write_caching_mode::default_true) .match( - model::write_caching_mode_to_string(model::write_caching_mode::off), - model::write_caching_mode::off) + model::write_caching_mode_to_string( + model::write_caching_mode::default_false), + model::write_caching_mode::default_false) .match( model::write_caching_mode_to_string( model::write_caching_mode::disabled), diff --git a/src/v/raft/tests/raft_fixture.cc b/src/v/raft/tests/raft_fixture.cc index 608ca278b625..aa3e995d9b57 100644 --- a/src/v/raft/tests/raft_fixture.cc +++ b/src/v/raft/tests/raft_fixture.cc @@ -714,10 +714,11 @@ ss::future<> raft_fixture::reset_background_flushing() const { } ss::future<> raft_fixture::set_write_caching(bool value) const { - auto mode = value ? model::write_caching_mode::on - : model::write_caching_mode::off; - co_await ss::smp::invoke_on_all( - [mode]() { config::shard_local_cfg().write_caching.set_value(mode); }); + auto mode = value ? model::write_caching_mode::default_true + : model::write_caching_mode::default_false; + co_await ss::smp::invoke_on_all([mode]() { + config::shard_local_cfg().write_caching_default.set_value(mode); + }); notify_replicas_on_config_change(); } diff --git a/src/v/raft/tests/simple_raft_fixture.h b/src/v/raft/tests/simple_raft_fixture.h index e868ae841808..be0516a78cee 100644 --- a/src/v/raft/tests/simple_raft_fixture.h +++ b/src/v/raft/tests/simple_raft_fixture.h @@ -84,7 +84,7 @@ struct simple_raft_fixture { = config::mock_binding(64), .election_timeout_ms = config::mock_binding(10ms), .write_caching = config::mock_binding( - model::write_caching_mode::off), + model::write_caching_mode::default_false), .write_caching_flush_ms = config::mock_binding(100ms), .write_caching_flush_bytes = config::mock_binding>(std::nullopt)}; diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index b61cbf8cafd8..5447d5510f18 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -1333,7 +1333,8 @@ void application::wire_up_redpanda_services( .raft_recovery_concurrency_per_shard.bind(), .election_timeout_ms = config::shard_local_cfg().raft_election_timeout_ms.bind(), - .write_caching = config::shard_local_cfg().write_caching.bind(), + .write_caching + = config::shard_local_cfg().write_caching_default.bind(), .write_caching_flush_ms = config::shard_local_cfg() .raft_replica_max_flush_delay_ms.bind(), diff --git a/src/v/storage/ntp_config.h b/src/v/storage/ntp_config.h index a209b1865bde..85fb54a2e5b3 100644 --- a/src/v/storage/ntp_config.h +++ b/src/v/storage/ntp_config.h @@ -265,14 +265,15 @@ class ntp_config { if (!model::is_user_topic(_ntp)) { return false; } - auto cluster_default = config::shard_local_cfg().write_caching(); + auto cluster_default + = config::shard_local_cfg().write_caching_default(); if (cluster_default == model::write_caching_mode::disabled) { return false; } auto value = _overrides ? _overrides->write_caching.value_or(cluster_default) : cluster_default; - return value == model::write_caching_mode::on; + return value == model::write_caching_mode::default_true; } std::chrono::milliseconds flush_ms() const { diff --git a/tests/rptest/tests/describe_topics_test.py b/tests/rptest/tests/describe_topics_test.py index 49937d51f0d7..bea1e6bf1211 100644 --- a/tests/rptest/tests/describe_topics_test.py +++ b/tests/rptest/tests/describe_topics_test.py @@ -223,7 +223,7 @@ def test_describe_topics_with_documentation_and_types(self): "write.caching": ConfigProperty( config_type="STRING", - value="off", + value="false", doc_string= "Cache batches until the segment appender chunk is full instead of " "flushing for every acks=all write. This is the global default " diff --git a/tests/rptest/tests/simple_e2e_test.py b/tests/rptest/tests/simple_e2e_test.py index e0ad80c6ece4..24a8f690d480 100644 --- a/tests/rptest/tests/simple_e2e_test.py +++ b/tests/rptest/tests/simple_e2e_test.py @@ -101,7 +101,7 @@ def inject_failures(): node=node)) time.sleep(5) - (acks, wc_conf) = (-1, "on") if write_caching else (1, "off") + (acks, wc_conf) = (-1, "true") if write_caching else (1, "false") # use small segment size to enable log eviction self.start_redpanda(num_nodes=3, si_settings=SISettings( @@ -114,7 +114,7 @@ def inject_failures(): 5242880, "default_topic_replications": 3, - "write_caching": + "write_caching_default": wc_conf, "raft_replica_max_pending_flush_bytes": 1024 * 1024 * 1024 * 1024, diff --git a/tests/rptest/tests/write_caching_fi_e2e_test.py b/tests/rptest/tests/write_caching_fi_e2e_test.py index ab60000cd63d..0fcfb3a1d5a2 100644 --- a/tests/rptest/tests/write_caching_fi_e2e_test.py +++ b/tests/rptest/tests/write_caching_fi_e2e_test.py @@ -29,7 +29,7 @@ class WriteCachingFailureInjectionE2ETest(RedpandaTest): def __init__(self, test_context): extra_rp_conf = dict( - write_caching="on", + write_caching_default="true", # Make data loss more likely by allowing more data to reside in # memory only. diff --git a/tests/rptest/tests/write_caching_fi_test.py b/tests/rptest/tests/write_caching_fi_test.py index 541c36419a30..b9ea2f947ac0 100644 --- a/tests/rptest/tests/write_caching_fi_test.py +++ b/tests/rptest/tests/write_caching_fi_test.py @@ -22,7 +22,7 @@ class WriteCachingFailureInjectionTest(RedpandaTest): def __init__(self, test_context): extra_rp_conf = dict( - write_caching="on", + write_caching_default="true", # Make data loss more likely by allowing more data to reside in # memory only. diff --git a/tests/rptest/tests/write_caching_test.py b/tests/rptest/tests/write_caching_test.py index 3269c5c278f7..543d060b61db 100644 --- a/tests/rptest/tests/write_caching_test.py +++ b/tests/rptest/tests/write_caching_test.py @@ -20,12 +20,12 @@ # no StrEnum support in test python version class WriteCachingMode(str, Enum): - ON = "on" - OFF = "off" + TRUE = "true" + FALSE = "false" DISABLED = "disabled" def __bool__(self): - return self.value == self.ON + return self.value == self.TRUE def __str__(self): return self.value @@ -105,7 +105,7 @@ def set_topic_properties(self, key: str, value): @cluster(num_nodes=3) def test_properties(self): - write_caching_conf = "write_caching" + write_caching_conf = "write_caching_default" flush_ms_conf = "raft_replica_max_flush_delay_ms" flush_bytes_conf = "raft_replica_max_pending_flush_bytes" @@ -118,12 +118,12 @@ def test_properties(self): partitions=1, replicas=3, config={ - TopicSpec.PROPERTY_WRITE_CACHING: "on", + TopicSpec.PROPERTY_WRITE_CACHING: "true", TopicSpec.PROPERTY_FLUSH_MS: 123, TopicSpec.PROPERTY_FLUSH_BYTES: 9999 }) - self.validate_topic_configs(WriteCachingMode.ON, 123, 9999) + self.validate_topic_configs(WriteCachingMode.TRUE, 123, 9999) # New topic with defaults topic = TopicSpec() @@ -131,42 +131,46 @@ def test_properties(self): self.rpk.create_topic(topic=topic.name, partitions=1, replicas=3) # Validate cluster defaults - self.validate_topic_configs(WriteCachingMode.OFF, 100, 262144) + self.validate_topic_configs(WriteCachingMode.FALSE, 100, 262144) # Changing cluster level configs - self.set_cluster_config(write_caching_conf, "on") - self.validate_topic_configs(WriteCachingMode.ON, 100, 262144) + self.set_cluster_config(write_caching_conf, WriteCachingMode.TRUE) + self.validate_topic_configs(WriteCachingMode.TRUE, 100, 262144) self.set_cluster_config(flush_ms_conf, 200) - self.validate_topic_configs(WriteCachingMode.ON, 200, 262144) + self.validate_topic_configs(WriteCachingMode.TRUE, 200, 262144) self.set_cluster_config(flush_bytes_conf, 32768) - self.validate_topic_configs(WriteCachingMode.ON, 200, 32768) + self.validate_topic_configs(WriteCachingMode.TRUE, 200, 32768) # Turn off write caching at topic level - self.set_topic_properties(TopicSpec.PROPERTY_WRITE_CACHING, "off") - self.validate_topic_configs(WriteCachingMode.OFF, 200, 32768) + self.set_topic_properties(TopicSpec.PROPERTY_WRITE_CACHING, + WriteCachingMode.FALSE) + self.validate_topic_configs(WriteCachingMode.FALSE, 200, 32768) # Turn off write caching at cluster level but enable at topic level # topic properties take precedence - self.set_cluster_config(write_caching_conf, "off") - self.set_topic_properties(TopicSpec.PROPERTY_WRITE_CACHING, "on") - self.validate_topic_configs(WriteCachingMode.ON, 200, 32768) + self.set_cluster_config(write_caching_conf, WriteCachingMode.FALSE) + self.set_topic_properties(TopicSpec.PROPERTY_WRITE_CACHING, + WriteCachingMode.TRUE) + self.validate_topic_configs(WriteCachingMode.TRUE, 200, 32768) # Kill switch test, disable write caching feature globally, # should override topic level property - self.set_cluster_config(write_caching_conf, "disabled") + self.set_cluster_config(write_caching_conf, WriteCachingMode.DISABLED) self.validate_topic_configs(WriteCachingMode.DISABLED, 200, 32768) # Try to update the topic property now, should throw an error try: - self.set_topic_properties(TopicSpec.PROPERTY_WRITE_CACHING, "on") + self.set_topic_properties(TopicSpec.PROPERTY_WRITE_CACHING, + WriteCachingMode.DISABLED) assert False, "No exception thrown when updating topic propertes in disabled mode." except RpkException as e: assert "INVALID_CONFIG" in str(e) # Enable again - self.set_cluster_config(write_caching_conf, "on") - self.set_topic_properties(TopicSpec.PROPERTY_WRITE_CACHING, "on") - self.validate_topic_configs(WriteCachingMode.ON, 200, 32768) + self.set_cluster_config(write_caching_conf, WriteCachingMode.TRUE) + self.set_topic_properties(TopicSpec.PROPERTY_WRITE_CACHING, + WriteCachingMode.TRUE) + self.validate_topic_configs(WriteCachingMode.TRUE, 200, 32768) @cluster(num_nodes=3) def test_bad_properties(self): @@ -234,7 +238,7 @@ def validators(value_flush: int, value_no_flush: int): checker.evaluate(validators(num_events_with_flush, 0)) # Enable write caching - self.rpk.cluster_config_set("write_caching", "on") + self.rpk.cluster_config_set("write_caching_default", "true") num_events_without_flush = randint(199, 299) checker.evaluate( From 0239a11abd8aabb7348dd40b8735df67af960005 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Wed, 10 Apr 2024 23:40:03 -0700 Subject: [PATCH 4/7] consensus: redo background flushing Original implementation relied on a CV that used a high resolution timer and with a timer per raft group that proved to expensive at scale. Also, the flusher fired periodically regardless of whether there is pending unflushed data from lower acks. This commit redoes it using a deferred flush that uses a lowres clock and the the flushes are scheduled only if unflushed data is appended. Additionally the timers are canceled in favor of background flushes that are scheduled when flush.bytes is hit. This removes reliance on timers as much as possible and the timers should never fire in default acks=all usecases. --- src/v/raft/consensus.cc | 80 ++++++++++++++++++++++------------------- src/v/raft/consensus.h | 14 ++++++-- 2 files changed, 54 insertions(+), 40 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 4a8c0227bf4d..b85a9064dea1 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -156,7 +156,10 @@ consensus::consensus( maybe_step_down(); dispatch_vote(false); }); - ssx::spawn_with_gate(_bg, [this] { return background_flusher(); }); + _deferred_flusher.set_callback([this]() { + ssx::spawn_with_gate( + _bg, [this]() { return do_flush().discard_result(); }); + }); } void consensus::setup_metrics() { @@ -281,7 +284,7 @@ ss::future<> consensus::stop() { co_await _batcher.stop(); _op_lock.broken(); - _background_flusher.broken(); + _deferred_flusher.cancel(); co_await _bg.close(); // close writer if we have to @@ -2660,6 +2663,8 @@ append_entries_reply consensus::make_append_entries_reply( } ss::future consensus::flush_log() { + _deferred_flusher.cancel(); + _in_flight_flush = false; if (!has_pending_flushes()) { _last_flush_time = clock_type::now(); co_return flushed::no; @@ -2707,6 +2712,32 @@ ss::future consensus::flush_log() { co_return flushed::yes; } +void consensus::background_flush_log() { + _in_flight_flush = true; + // An imminent flush means that we do not need the + // scheduled flush anymore as this guarantees that everything + // up until this point is flushed. + _deferred_flusher.cancel(); + ssx::spawn_with_gate(_bg, [this]() { return do_flush().discard_result(); }); +} + +void consensus::maybe_schedule_flush() { + if (!has_pending_flushes() || _in_flight_flush) { + // if a flush is already pending, nothing to do. + return; + } + if (_pending_flush_bytes >= _max_pending_flush_bytes) { + // max flush bytes exceeded, schedule a flush right away. + background_flush_log(); + return; + } + if (!_deferred_flusher.armed()) { + // No imminent flush and no deferred flush at this point, + // arm the timer to ensure appended data honors flush.ms + _deferred_flusher.arm(flush_ms()); + } +} + ss::future consensus::disk_append( model::record_batch_reader&& reader, update_last_quorum_index should_update_last_quorum_idx) { @@ -2755,7 +2786,7 @@ ss::future consensus::disk_append( // Here are are appending entries without a flush, signal the // flusher incase we hit the thresholds, particularly unflushed // bytes. - _background_flusher.signal(); + maybe_schedule_flush(); } // TODO // if we rolled a log segment. write current configuration @@ -3922,19 +3953,15 @@ void consensus::upsert_recovery_state( } } -ss::future<> consensus::maybe_flush_log() { - // if there is nothing to do exit without grabbing an op_lock, this check is - // sloppy as we data can be in flight but it is ok since next check will - // detect it and flush log. - if ( - _pending_flush_bytes < _max_pending_flush_bytes - && time_since_last_flush() < flush_ms()) { - co_return; - } +ss::future<> consensus::do_flush() { try { auto holder = _bg.hold(); - auto u = co_await _op_lock.get_units(); - auto flushed = co_await flush_log(); + auto flush = ssx::now(consensus::flushed::no); + { + auto u = co_await _op_lock.get_units(); + flush = flush_log(); + } + auto flushed = co_await std::move(flush); if (flushed && is_leader()) { for (auto& [id, idx] : _fstats) { // force full heartbeat to move the committed index forward @@ -3992,29 +4019,8 @@ void consensus::notify_config_update() { _write_caching_enabled = log_config().write_caching(); _max_pending_flush_bytes = log_config().flush_bytes(); _max_flush_delay = compute_max_flush_delay(); - // let the flusher know that the tunables have changed, this may result - // in an extra flush but that should be ok since this this is a rare - // operation. - _background_flusher.signal(); -} - -ss::future<> consensus::background_flusher() { - while (!_bg.is_closed()) { - try { - co_await std::visit( - [&](auto&& flush_delay) { - return _background_flusher.wait(flush_delay); - }, - _max_flush_delay); - } catch (const ss::condition_variable_timed_out&) { - } - co_await maybe_flush_log().handle_exception( - [this](const std::exception_ptr& ex) { - vlog( - _ctxlog.warn, - "Ignoring exception from background flush: {}", - ex); - }); + if (_deferred_flusher.armed()) { + _deferred_flusher.rearm(ss::lowres_clock::now() + flush_ms()); } } diff --git a/src/v/raft/consensus.h b/src/v/raft/consensus.h index 864a5afa127f..752157b9828a 100644 --- a/src/v/raft/consensus.h +++ b/src/v/raft/consensus.h @@ -623,6 +623,9 @@ class consensus { /// \brief _does not_ hold the lock. using flushed = ss::bool_class; ss::future flush_log(); + /// flush spawned with a gate. + void background_flush_log(); + void maybe_schedule_flush(); void maybe_step_down(); @@ -766,8 +769,7 @@ class consensus { } flush_delay_t compute_max_flush_delay() const; - ss::future<> maybe_flush_log(); - ss::future<> background_flusher(); + ss::future<> do_flush(); // args vnode _self; @@ -821,6 +823,8 @@ class consensus { replicate_batcher _batcher; size_t _pending_flush_bytes{0}; clock_type::time_point _last_flush_time; + /// Ensures that we do not schedule multiple redudant flushes. + bool _in_flight_flush = false; /// used to wait for background ops before shutting down ss::gate _bg; @@ -899,7 +903,11 @@ class consensus { // Its a variant to workaround a bug in cv::wait() method, // check comment in compute_max_flush_delay() for details flush_delay_t _max_flush_delay; - ss::condition_variable _background_flusher; + + // Timer responsible for flush.ms. The timer is only armed when + // requests without an explicit flush arrive and the timer is + // immediately canceled if a flush is triggered via other means. + ss::timer _deferred_flusher; replication_monitor _replication_monitor; From a18299ae79d800eb32c1230de55ceb2ac14dc5ac Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Fri, 12 Apr 2024 23:57:46 -0700 Subject: [PATCH 5/7] tests/raft: deflake test_linarizable_barrier The test waits until a barrier is attained until the dirty offset. BOOST_REQUIRE_EQUAL(r.value(), leader_offsets.committed_offset); A barrier doesn't guarantee that out of the box, calling it may return an offset below dirty offset until which all hearbeats agree. If we want a barrier until a desired offset, we have to ensure the barrier returns >= desired offset, adjusts the calling code to do this. Underlying reason is that flush_log() (called from the barrier) maynot always guarantee a flush, it may just return right away if there is already a flush in progress (and hence nothing to flush). Meanwhile a round of heartbeats guarantee barrier progress and the barrier returns an earlier offset which doesn't include the dirty offset that pending a flush. --- src/v/raft/tests/append_entries_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/v/raft/tests/append_entries_test.cc b/src/v/raft/tests/append_entries_test.cc index 9639bbf71f83..9e451c08bdad 100644 --- a/src/v/raft/tests/append_entries_test.cc +++ b/src/v/raft/tests/append_entries_test.cc @@ -760,7 +760,8 @@ FIXTURE_TEST(test_linarizable_barrier, raft_test_fixture) { result l_offset) { r = l_offset; leader_offsets = leader_node.log->offsets(); - return l_offset.has_value(); + return l_offset.has_value() + && l_offset.value() == leader_offsets.dirty_offset; }); }) .get(); From b0ffde90ee9d97d8f7ee98c3a8f5fb6d54f349ae Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Thu, 11 Apr 2024 16:44:10 -0700 Subject: [PATCH 6/7] replication_monitor: factor replication detection into methods no logic changes, will be used in next commit. --- src/v/raft/replication_monitor.cc | 108 +++++++++++++++++------------- src/v/raft/replication_monitor.h | 4 ++ 2 files changed, 66 insertions(+), 46 deletions(-) diff --git a/src/v/raft/replication_monitor.cc b/src/v/raft/replication_monitor.cc index 7e8407ead517..0f8ffaba327a 100644 --- a/src/v/raft/replication_monitor.cc +++ b/src/v/raft/replication_monitor.cc @@ -109,6 +109,63 @@ ss::future replication_monitor::wait_until_majority_replicated( return do_wait_until(append, wait_type::majority_replication); } +std::optional replication_monitor::is_append_committed_or_truncated( + const storage::append_result& append_info) const { + auto committed_offset = _raft->committed_offset(); + auto committed_offset_term = _raft->get_term(committed_offset); + auto appended_term = _raft->get_term(append_info.last_offset); + // Truncation is sealed once the following events happen + // - There is new entry from a different term replacing the + // appended entries. + // - The new entry is committed. + // The second condition is important because without that the + // original entries may be reinstanted after another leadership + // change. For example: + // + // 5 replicas A, B, C, D, E, leader=A, term=5 + // + // A - replicate([base: 10, last: 20]) term=5 + // A - append_local([10, 20]) term=5, dirty=20 + // A -> B - append_entries([10, 20]) term=5, dirty=20 + // A - frozen briefly, cannot send further append_entries + // (C, D, E), elect C as leader, term=6, dirty=9 + // C - append_local([10]) - configuration batch - term=6 + // C -> A append_entries([10]), term=6 + // C - crashes + // A truncates, term=6, dirty_offset=10 -> First truncation + // (B, D, E), elect B as leader, term=7, dirty=20 + // B -> A, D, E append_entries([10, 20]) + // committed offset = 20 + // + // In the above example if we do not wait for committed + // offset and stop at first truncation event, we risk an + // incorrect truncation detection. + auto truncated = [&] { + return committed_offset_term > append_info.last_term + && appended_term != append_info.last_term; + }; + if ( + appended_term == append_info.last_term + && committed_offset >= append_info.last_offset) { + // committed + return errc::success; + } else if (truncated()) { + // truncated + return errc::replicated_entry_truncated; + } + return std::nullopt; +} + +bool replication_monitor::is_append_replicated( + const storage::append_result& append_info) const { + auto replicated_term = _raft->get_term(append_info.last_offset); + // We need the offset replicated in the same term it is appended in + // for the replication to be successful. + // Truncation is detected based on committed offset updates. + return replicated_term == append_info.last_term + && _raft->_majority_replicated_index >= append_info.last_offset; +} + ss::future<> replication_monitor::do_notify_replicated() { if (_pending_majority_replication_waiters > 0) { auto majority_replicated_offset = _raft->_majority_replicated_index; @@ -117,14 +174,9 @@ ss::future<> replication_monitor::do_notify_replicated() { && it->first <= majority_replicated_offset) { auto& entry = it->second; auto& append_info = entry->append_info; - auto replicated_term = _raft->get_term(append_info.last_offset); if ( entry->type == wait_type::majority_replication - // We need the offset is majority replicated in the same term - // it is appended for the replication to be successful. - // Truncation is detected based on committed offset updates. - && replicated_term == append_info.last_term - && majority_replicated_offset >= append_info.last_offset) { + && is_append_replicated(append_info)) { entry->done.set_value(errc::success); it = _waiters.erase(it); } else { @@ -151,46 +203,10 @@ ss::future<> replication_monitor::do_notify_committed() { break; } - auto appended_term = _raft->get_term(append_info.last_offset); - // Truncation is sealed once the following events happen - // - There is new entry from a different term replacing the - // appended entries. - // - The new entry is committed. - // The second condition is important because without that the - // original entries may be reinstanted after another leadership - // change. For example: - // - // 5 replicas A, B, C, D, E, leader=A, term=5 - // - // A - replicate([base: 10, last: 20]) term=5 - // A - append_local([10, 20]) term=5, dirty=20 - // A -> B - append_entries([10, 20]) term=5, dirty=20 - // A - frozen briefly, cannot send further append_entries - // (C, D, E), elect C as leader, term=6, dirty=9 - // C - append_local([10]) - configuration batch - term=6 - // C -> A append_entries([10]), term=6 - // C - crashes - // A truncates, term=6, dirty_offset=10 -> First truncation - // (B, D, E), elect B as leader, term=7, dirty=20 - // B -> A, D, E append_entries([10, 20]) - // committed offset = 20 - // - // In the above example if we do not wait for committed - // offset and stop at first truncation event, we risk an - // incorrect truncation detection. - auto truncated = [&] { - return committed_offset_term > append_info.last_term - && appended_term != append_info.last_term; - }; - if ( - appended_term == append_info.last_term - && committed_offset >= append_info.last_offset) { - // committed - entry->done.set_value(errc::success); - it = _waiters.erase(it); - } else if (truncated()) { - // truncated - entry->done.set_value(errc::replicated_entry_truncated); + auto committed_or_truncated = is_append_committed_or_truncated( + append_info); + if (committed_or_truncated) { + entry->done.set_value(committed_or_truncated.value()); it = _waiters.erase(it); } else { ++it; diff --git a/src/v/raft/replication_monitor.h b/src/v/raft/replication_monitor.h index 126b8ddcb130..2ec755795589 100644 --- a/src/v/raft/replication_monitor.h +++ b/src/v/raft/replication_monitor.h @@ -83,6 +83,10 @@ class replication_monitor { ss::future do_wait_until(storage::append_result, wait_type); + bool is_append_replicated(const storage::append_result&) const; + std::optional + is_append_committed_or_truncated(const storage::append_result&) const; + consensus* _raft; // Key is the base offset of the append results. So all the waiters // are sorted by the base offsets of the results. From 53b82ba9459934b05895674f3e8374a3531a5716 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Thu, 11 Apr 2024 16:46:55 -0700 Subject: [PATCH 7/7] replication_monitor: fix missed notifications during race If the replication notification happens before wait_for_majority() is called, the waiter is never resolved. Adds an additional check before creating the waiter instance. --- src/v/raft/replication_monitor.cc | 7 +++++++ src/v/raft/tests/basic_raft_fixture_test.cc | 18 ++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/src/v/raft/replication_monitor.cc b/src/v/raft/replication_monitor.cc index 0f8ffaba327a..933ba5754003 100644 --- a/src/v/raft/replication_monitor.cc +++ b/src/v/raft/replication_monitor.cc @@ -101,11 +101,18 @@ ss::future replication_monitor::do_wait_until( ss::future replication_monitor::wait_until_committed(storage::append_result append) { + auto done = is_append_committed_or_truncated(append); + if (done) { + return ssx::now(done.value()); + } return do_wait_until(append, wait_type::commit); } ss::future replication_monitor::wait_until_majority_replicated( storage::append_result append) { + if (is_append_replicated(append)) { + return ssx::now(errc::success); + } return do_wait_until(append, wait_type::majority_replication); } diff --git a/src/v/raft/tests/basic_raft_fixture_test.cc b/src/v/raft/tests/basic_raft_fixture_test.cc index 40d8ce403fe8..76c0794cf33c 100644 --- a/src/v/raft/tests/basic_raft_fixture_test.cc +++ b/src/v/raft/tests/basic_raft_fixture_test.cc @@ -120,6 +120,24 @@ TEST_P_CORO(all_acks_fixture, validate_replication) { co_await assert_logs_equal(); } +TEST_P_CORO(all_acks_fixture, single_node_replication) { + co_await create_simple_group(1); + + auto params = GetParam(); + co_await set_write_caching(params.write_caching); + + auto leader = co_await wait_for_leader(10s); + auto& leader_node = node(leader); + + auto result = co_await leader_node.raft()->replicate( + make_batches({{"k_1", "v_1"}}), replicate_options(params.c_lvl)); + ASSERT_TRUE_CORO(result.has_value()); + + // wait for committed offset to propagate + co_await wait_for_committed_offset(result.value().last_offset, 5s); + co_await assert_logs_equal(); +} + TEST_P_CORO(all_acks_fixture, validate_recovery) { co_await create_simple_group(5); auto leader = co_await wait_for_leader(10s);