Skip to content

Commit

Permalink
Merge pull request #17823 from bharathv/wc_followups_2
Browse files Browse the repository at this point in the history
write caching follow ups
  • Loading branch information
piyushredpanda authored Apr 16, 2024
2 parents a2b89af + 53b82ba commit 315a09e
Show file tree
Hide file tree
Showing 35 changed files with 299 additions and 240 deletions.
7 changes: 7 additions & 0 deletions src/go/rpk/pkg/cli/cluster/config/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ func importConfig(
if vInt, ok := v.(int); ok {
v = float64(vInt)
}
} else if meta.Type == "string" {
// Some boolean configurations are inherently strings
// in which case we type switch to string for correct comparison
// below. yaml loses this type information during export.
if vBool, ok := v.(bool); ok {
v = fmt.Sprintf("%v", vBool)
}
} else if meta.Type == "array" && meta.Items.Type == "string" {
switch vArray := v.(type) {
case []interface{}:
Expand Down
4 changes: 2 additions & 2 deletions src/go/rpk/pkg/cli/redpanda/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ func setContainerModeCfgFields(y *config.RedpandaYaml) {
y.Redpanda.Other["fetch_reads_debounce_timeout"] = 10
y.Redpanda.Other["group_initial_rebalance_delay"] = 0
y.Redpanda.Other["log_segment_size_min"] = 1
y.Redpanda.Other["write_caching"] = "on"
y.Redpanda.Other["write_caching_default"] = "true"
}

func getOrFindInstallDir(fs afero.Fs, installDir string) (string, error) {
Expand Down Expand Up @@ -992,7 +992,7 @@ environments:
* fetch_reads_debounce_timeout: 10
* group_initial_rebalance_delay: 0
* log_segment_size_min: 1
* write_caching: on
* write_caching_default: true
After redpanda starts you can modify the cluster properties using:
rpk config set <key> <value>`
8 changes: 4 additions & 4 deletions src/go/rpk/pkg/cli/redpanda/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func TestStartCommand(t *testing.T) {
"fetch_reads_debounce_timeout": 10,
"group_initial_rebalance_delay": 0,
"log_segment_size_min": 1,
"write_caching": "on",
"write_caching_default": "true",
}
expYAML, err := yaml.Marshal(c)
require.NoError(st, err)
Expand Down Expand Up @@ -1537,7 +1537,7 @@ func TestStartCommand(t *testing.T) {
"fetch_reads_debounce_timeout": 10,
"group_initial_rebalance_delay": 0,
"log_segment_size_min": 1,
"write_caching": "on",
"write_caching_default": "true",
}
require.Equal(st, expectedClusterFields, y.Redpanda.Other)
},
Expand Down Expand Up @@ -1590,7 +1590,7 @@ func TestStartCommand(t *testing.T) {
"fetch_reads_debounce_timeout": 10,
"group_initial_rebalance_delay": 0,
"log_segment_size_min": 1,
"write_caching": "on",
"write_caching_default": "true",
}
require.Nil(st, y.Redpanda.ID)
require.Equal(st, true, y.Redpanda.DeveloperMode)
Expand Down Expand Up @@ -1634,7 +1634,7 @@ func TestStartCommand(t *testing.T) {
"fetch_reads_debounce_timeout": 10,
"group_initial_rebalance_delay": 0,
"log_segment_size_min": 1,
"write_caching": "on",
"write_caching_default": "true",
}
require.Exactly(st, expectedClusterFields, y.Redpanda.Other)
},
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/cluster_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ partition_raft_state get_partition_raft_state(consensus_ptr ptr) {
raft_state.write_caching_enabled = ptr->write_caching_enabled();
raft_state.flush_bytes = ptr->flush_bytes();
raft_state.flush_ms = ptr->flush_ms();
raft_state.time_since_last_flush = ptr->time_since_last_flush();
raft_state.replication_monitor_state = fmt::format(
"{}", ptr->get_replication_monitor());

Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/metadata_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ metadata_cache::get_topic_write_caching_mode(
return std::nullopt;
}
if (
config::shard_local_cfg().write_caching()
config::shard_local_cfg().write_caching_default()
== model::write_caching_mode::disabled) {
return model::write_caching_mode::disabled;
}
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/tests/topic_properties_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ inline cluster::topic_properties random_topic_properties() {
properties.remote_delete = false;
properties.write_caching = tests::random_optional([] {
return random_generators::random_choice(
{model::write_caching_mode::on,
model::write_caching_mode::off,
{model::write_caching_mode::default_true,
model::write_caching_mode::default_false,
model::write_caching_mode::disabled});
});
properties.flush_ms = tests::random_optional(
Expand Down
6 changes: 4 additions & 2 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -3975,7 +3975,7 @@ struct partition_stm_state
struct partition_raft_state
: serde::envelope<
partition_raft_state,
serde::version<4>,
serde::version<5>,
serde::compat_version<0>> {
using rpc_adl_exempt = std::true_type;

Expand All @@ -4001,6 +4001,7 @@ struct partition_raft_state
size_t flush_bytes;
std::chrono::milliseconds flush_ms;
ss::sstring replication_monitor_state;
std::chrono::milliseconds time_since_last_flush;

struct follower_state
: serde::envelope<
Expand Down Expand Up @@ -4093,7 +4094,8 @@ struct partition_raft_state
write_caching_enabled,
flush_bytes,
flush_ms,
replication_monitor_state);
replication_monitor_state,
time_since_last_flush);
}

friend bool
Expand Down
4 changes: 2 additions & 2 deletions src/v/compat/cluster_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -642,8 +642,8 @@ struct instance_generator<cluster::topic_properties> {
[] { return model::vcluster_id(random_xid()); }),
tests::random_optional([] {
return random_generators::random_choice(
{model::write_caching_mode::on,
model::write_caching_mode::off,
{model::write_caching_mode::default_true,
model::write_caching_mode::default_false,
model::write_caching_mode::disabled});
}),
tests::random_optional([] { return tests::random_duration_ms(); }),
Expand Down
12 changes: 6 additions & 6 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -968,20 +968,20 @@ configuration::configuration()
"one follower",
{.visibility = visibility::tunable},
16)
, write_caching(
, write_caching_default(
*this,
"write_caching",
"write_caching_default",
"Cache batches until the segment appender chunk is full instead of "
"flushing for every acks=all write. This is the global default "
"for all topics and can be overriden at a topic scope with property "
"write.caching. 'disabled' mode takes precedence over topic overrides "
"and disables the feature altogether for the entire cluster.",
{.needs_restart = needs_restart::no,
.example = "on",
.example = "true",
.visibility = visibility::user},
model::write_caching_mode::off,
{model::write_caching_mode::on,
model::write_caching_mode::off,
model::write_caching_mode::default_false,
{model::write_caching_mode::default_true,
model::write_caching_mode::default_false,
model::write_caching_mode::disabled})
, reclaim_min_size(
*this,
Expand Down
2 changes: 1 addition & 1 deletion src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ struct configuration final : public config_store {
property<bool> raft_recovery_throttle_disable_dynamic_mode;
property<std::optional<uint32_t>> raft_smp_max_non_local_requests;
property<uint32_t> raft_max_concurrent_append_requests_per_follower;
enum_property<model::write_caching_mode> write_caching;
enum_property<model::write_caching_mode> write_caching_default;

property<size_t> reclaim_min_size;
property<size_t> reclaim_max_size;
Expand Down
9 changes: 5 additions & 4 deletions src/v/kafka/server/handlers/configs/config_response_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -595,16 +595,17 @@ config_response_container_t make_topic_configs(
add_topic_config_if_requested(
config_keys,
result,
config::shard_local_cfg().write_caching.name(),
config::shard_local_cfg().write_caching(),
config::shard_local_cfg().write_caching_default.name(),
config::shard_local_cfg().write_caching_default(),
topic_property_write_caching,
(config::shard_local_cfg().write_caching()
(config::shard_local_cfg().write_caching_default()
== model::write_caching_mode::disabled)
? model::write_caching_mode::disabled
: topic_properties.write_caching,
include_synonyms,
maybe_make_documentation(
include_documentation, config::shard_local_cfg().write_caching.desc()),
include_documentation,
config::shard_local_cfg().write_caching_default.desc()),
&describe_as_string<model::write_caching_mode>);

add_topic_config_if_requested(
Expand Down
7 changes: 4 additions & 3 deletions src/v/kafka/server/handlers/configs/config_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,11 @@ struct write_caching_config_validator {
"Invalid value {} for {}, accepted values: [{}, {}]",
value,
topic_property_write_caching,
model::write_caching_mode::on,
model::write_caching_mode::off);
model::write_caching_mode::default_true,
model::write_caching_mode::default_false);
}
auto cluster_default = config::shard_local_cfg().write_caching();
auto cluster_default
= config::shard_local_cfg().write_caching_default();
if (cluster_default == model::write_caching_mode::disabled) {
return fmt::format("write caching disabled at cluster level");
}
Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/server/handlers/topics/validators.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ struct write_caching_configs_validator {
return mode != model::write_caching_mode::disabled;
}
// write caching cannot be turned on for internal topics.
return mode != model::write_caching_mode::on;
return mode != model::write_caching_mode::default_true;
}

static bool validate_flush_ms(const creatable_topic& c) {
Expand Down
32 changes: 16 additions & 16 deletions src/v/kafka/server/tests/alter_config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ FIXTURE_TEST(test_alter_single_topic_config, alter_config_test_fixture) {
properties.emplace("cleanup.policy", "compact");
properties.emplace("redpanda.remote.read", "true");
properties.emplace("replication.factor", "1");
properties.emplace("write.caching", "on");
properties.emplace("write.caching", "true");
properties.emplace("flush.ms", "225");
properties.emplace("flush.bytes", "32468");

Expand Down Expand Up @@ -485,14 +485,14 @@ FIXTURE_TEST(test_alter_multiple_topics_config, alter_config_test_fixture) {
properties_1.emplace("retention.ms", "1234");
properties_1.emplace("cleanup.policy", "compact");
properties_1.emplace("replication.factor", "1");
properties_1.emplace("write.caching", "on");
properties_1.emplace("write.caching", "true");
properties_1.emplace("flush.ms", "225");
properties_1.emplace("flush.bytes", "32468");

absl::flat_hash_map<ss::sstring, ss::sstring> properties_2;
properties_2.emplace("retention.bytes", "4096");
properties_2.emplace("replication.factor", "1");
properties_2.emplace("write.caching", "off");
properties_2.emplace("write.caching", "false");
properties_2.emplace("flush.ms", "100");
properties_2.emplace("flush.bytes", "990");

Expand All @@ -514,13 +514,13 @@ FIXTURE_TEST(test_alter_multiple_topics_config, alter_config_test_fixture) {
topic_1, "retention.ms", fmt::format("{}", 1234ms), describe_resp_1);
assert_property_value(
topic_1, "cleanup.policy", "compact", describe_resp_1);
assert_property_value(topic_1, "write.caching", "on", describe_resp_1);
assert_property_value(topic_1, "write.caching", "true", describe_resp_1);
assert_property_value(topic_1, "flush.ms", "225", describe_resp_1);
assert_property_value(topic_1, "flush.bytes", "32468", describe_resp_1);

auto describe_resp_2 = describe_configs(topic_2);
assert_property_value(topic_2, "retention.bytes", "4096", describe_resp_2);
assert_property_value(topic_2, "write.caching", "off", describe_resp_2);
assert_property_value(topic_2, "write.caching", "false", describe_resp_2);
assert_property_value(topic_2, "flush.ms", "100", describe_resp_2);
assert_property_value(topic_2, "flush.bytes", "990", describe_resp_2);
}
Expand Down Expand Up @@ -581,7 +581,7 @@ FIXTURE_TEST(
absl::flat_hash_map<ss::sstring, ss::sstring> properties;
properties.emplace("retention.ms", "1234");
properties.emplace("replication.factor", "1");
properties.emplace("write.caching", "on");
properties.emplace("write.caching", "true");
properties.emplace("flush.ms", "225");
properties.emplace("flush.bytes", "32468");

Expand All @@ -596,7 +596,7 @@ FIXTURE_TEST(
auto describe_resp = describe_configs(test_tp);
assert_property_value(
test_tp, "retention.ms", fmt::format("{}", 1234ms), describe_resp);
assert_property_value(test_tp, "write.caching", "on", describe_resp);
assert_property_value(test_tp, "write.caching", "true", describe_resp);
assert_property_value(test_tp, "flush.ms", "225", describe_resp);
assert_property_value(test_tp, "flush.bytes", "32468", describe_resp);

Expand All @@ -606,7 +606,7 @@ FIXTURE_TEST(
absl::flat_hash_map<ss::sstring, ss::sstring> new_properties;
new_properties.emplace("retention.bytes", "4096");
new_properties.emplace("replication.factor", "1");
new_properties.emplace("write.caching", "off");
new_properties.emplace("write.caching", "false");
new_properties.emplace("flush.ms", "9999");
new_properties.emplace("flush.bytes", "8888");

Expand All @@ -622,7 +622,7 @@ FIXTURE_TEST(
new_describe_resp);
assert_property_value(
test_tp, "retention.bytes", "4096", new_describe_resp);
assert_property_value(test_tp, "write.caching", "off", new_describe_resp);
assert_property_value(test_tp, "write.caching", "false", new_describe_resp);
assert_property_value(test_tp, "flush.ms", "9999", new_describe_resp);
assert_property_value(test_tp, "flush.bytes", "8888", new_describe_resp);
}
Expand All @@ -641,7 +641,7 @@ FIXTURE_TEST(test_incremental_alter_config, alter_config_test_fixture) {
std::make_pair("1234", kafka::config_resource_operation::set));
properties.emplace(
"write.caching",
std::make_pair("on", kafka::config_resource_operation::set));
std::make_pair("true", kafka::config_resource_operation::set));
properties.emplace(
"flush.ms",
std::make_pair("1234", kafka::config_resource_operation::set));
Expand All @@ -660,7 +660,7 @@ FIXTURE_TEST(test_incremental_alter_config, alter_config_test_fixture) {
auto describe_resp = describe_configs(test_tp);
assert_property_value(
test_tp, "retention.ms", fmt::format("{}", 1234ms), describe_resp);
assert_property_value(test_tp, "write.caching", "on", describe_resp);
assert_property_value(test_tp, "write.caching", "true", describe_resp);
assert_property_value(test_tp, "flush.ms", "1234", describe_resp);
assert_property_value(test_tp, "flush.bytes", "5678", describe_resp);

Expand All @@ -676,7 +676,7 @@ FIXTURE_TEST(test_incremental_alter_config, alter_config_test_fixture) {
std::pair{"4096", kafka::config_resource_operation::set});
new_properties.emplace(
"write.caching",
std::make_pair("off", kafka::config_resource_operation::set));
std::make_pair("false", kafka::config_resource_operation::set));

incremental_alter_configs(
{make_incremental_alter_topic_config_resource(test_tp, new_properties)});
Expand All @@ -687,7 +687,7 @@ FIXTURE_TEST(test_incremental_alter_config, alter_config_test_fixture) {
test_tp, "retention.ms", fmt::format("{}", 1234ms), new_describe_resp);
assert_property_value(
test_tp, "retention.bytes", "4096", new_describe_resp);
assert_property_value(test_tp, "write.caching", "off", new_describe_resp);
assert_property_value(test_tp, "write.caching", "false", new_describe_resp);
assert_property_value(test_tp, "flush.ms", "1234", new_describe_resp);
assert_property_value(test_tp, "flush.bytes", "5678", new_describe_resp);
}
Expand Down Expand Up @@ -728,7 +728,7 @@ FIXTURE_TEST(test_incremental_alter_config_remove, alter_config_test_fixture) {
std::make_pair("1234", kafka::config_resource_operation::set));
properties.emplace(
"write.caching",
std::make_pair("on", kafka::config_resource_operation::set));
std::make_pair("true", kafka::config_resource_operation::set));
properties.emplace(
"flush.ms",
std::make_pair("9999", kafka::config_resource_operation::set));
Expand All @@ -747,7 +747,7 @@ FIXTURE_TEST(test_incremental_alter_config_remove, alter_config_test_fixture) {
auto describe_resp = describe_configs(test_tp);
assert_property_value(
test_tp, "retention.ms", fmt::format("{}", 1234ms), describe_resp);
assert_property_value(test_tp, "write.caching", "on", describe_resp);
assert_property_value(test_tp, "write.caching", "true", describe_resp);
assert_property_value(test_tp, "flush.ms", "9999", describe_resp);
assert_property_value(test_tp, "flush.bytes", "8888", describe_resp);

Expand Down Expand Up @@ -789,7 +789,7 @@ FIXTURE_TEST(test_incremental_alter_config_remove, alter_config_test_fixture) {
assert_property_value(
test_tp,
"write.caching",
fmt::format("{}", config::shard_local_cfg().write_caching()),
fmt::format("{}", config::shard_local_cfg().write_caching_default()),
new_describe_resp);
assert_property_value(
test_tp,
Expand Down
20 changes: 10 additions & 10 deletions src/v/model/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -555,21 +555,21 @@ using vcluster_id = named_type<xid, struct v_cluster_id_tag>;
* Type that represents the cluster wide write caching mode.
*/
enum class write_caching_mode : uint8_t {
// on by default for all topics
on = 0,
// off by default for all topics
off = 1,
// disabled by default clusterwide.
// cannot be overriden at topic level.
// true by default for all topics
default_true = 0,
// false by default for all topics
default_false = 1,
// disabled across all topics even for those
// with overrides. kill switch.
disabled = 2
};

constexpr const char* write_caching_mode_to_string(write_caching_mode s) {
switch (s) {
case write_caching_mode::on:
return "on";
case write_caching_mode::off:
return "off";
case write_caching_mode::default_true:
return "true";
case write_caching_mode::default_false:
return "false";
case write_caching_mode::disabled:
return "disabled";
default:
Expand Down
Loading

0 comments on commit 315a09e

Please sign in to comment.