Skip to content

Commit

Permalink
Merge pull request #16924 from bharathv/wc1
Browse files Browse the repository at this point in the history
write caching - configurations
  • Loading branch information
bharathv authored Mar 8, 2024
2 parents f5fb15d + bd9ae19 commit 60384c8
Show file tree
Hide file tree
Showing 42 changed files with 981 additions and 88 deletions.
3 changes: 3 additions & 0 deletions src/v/cluster/cluster_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ partition_raft_state get_partition_raft_state(consensus_ptr ptr) {
raft_state.has_pending_flushes = ptr->has_pending_flushes();
raft_state.is_leader = ptr->is_leader();
raft_state.is_elected_leader = ptr->is_elected_leader();
raft_state.write_caching_enabled = ptr->write_caching_enabled();
raft_state.flush_bytes = ptr->flush_bytes();
raft_state.flush_ms = ptr->flush_ms();

const auto& fstats = ptr->get_follower_stats();
if (ptr->is_elected_leader() && fstats.size() > 0) {
Expand Down
15 changes: 15 additions & 0 deletions src/v/cluster/metadata_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -372,4 +372,19 @@ const topic_disabled_partitions_set* metadata_cache::get_topic_disabled_set(
return _topics_state.local().get_topic_disabled_set(ns_tp);
}

std::optional<model::write_caching_mode>
metadata_cache::get_topic_write_caching_mode(
model::topic_namespace_view tp) const {
auto topic = get_topic_cfg(tp);
if (!topic) {
return std::nullopt;
}
if (
config::shard_local_cfg().write_caching()
== model::write_caching_mode::disabled) {
return model::write_caching_mode::disabled;
}
return topic->properties.write_caching;
}

} // namespace cluster
3 changes: 3 additions & 0 deletions src/v/cluster/metadata_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ class metadata_cache {
const topic_disabled_partitions_set*
get_topic_disabled_set(model::topic_namespace_view) const;

std::optional<model::write_caching_mode>
get_topic_write_caching_mode(model::topic_namespace_view) const;

private:
ss::sharded<topic_table>& _topics_state;
ss::sharded<members_table>& _members_table;
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,8 @@ ss::future<> partition::update_configuration(topic_properties properties) {
_topic_cfg->properties = std::move(properties);
}

_raft->notify_config_update();

// If this partition's cloud storage mode changed, rebuild the archiver.
// This must happen after raft update, because it reads raft's
// ntp_config to decide whether to construct an archiver.
Expand Down
10 changes: 10 additions & 0 deletions src/v/cluster/tests/topic_properties_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ inline cluster::topic_properties random_topic_properties() {

// Always set remote_delete=false to survive an ADL roundtrip
properties.remote_delete = false;
properties.write_caching = tests::random_optional([] {
return random_generators::random_choice(
{model::write_caching_mode::on,
model::write_caching_mode::off,
model::write_caching_mode::disabled});
});
properties.flush_ms = tests::random_optional(
[] { return tests::random_duration_ms(); });
properties.flush_bytes = tests::random_optional(
[] { return random_generators::get_int<size_t>(); });

return properties;
}
3 changes: 3 additions & 0 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,9 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) {
incremental_update(
properties.initial_retention_local_target_ms,
overrides.initial_retention_local_target_ms);
incremental_update(properties.write_caching, overrides.write_caching);
incremental_update(properties.flush_ms, overrides.flush_ms);
incremental_update(properties.flush_bytes, overrides.flush_bytes);
// no configuration change, no need to generate delta
if (properties == properties_snapshot) {
co_return errc::success;
Expand Down
46 changes: 40 additions & 6 deletions src/v/cluster/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,9 @@ bool topic_properties::has_overrides() const {
|| record_value_subject_name_strategy.has_value()
|| record_value_subject_name_strategy_compat.has_value()
|| initial_retention_local_target_bytes.is_engaged()
|| initial_retention_local_target_ms.is_engaged();
|| initial_retention_local_target_ms.is_engaged()
|| write_caching.has_value() || flush_ms.has_value()
|| flush_bytes.has_value();
}

bool topic_properties::requires_remote_erase() const {
Expand Down Expand Up @@ -227,6 +229,9 @@ topic_properties::get_ntp_cfg_overrides() const {
ret.initial_retention_local_target_bytes
= initial_retention_local_target_bytes;
ret.initial_retention_local_target_ms = initial_retention_local_target_ms;
ret.write_caching = write_caching;
ret.flush_ms = flush_ms;
ret.flush_bytes = flush_bytes;
return ret;
}

Expand Down Expand Up @@ -354,7 +359,10 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {
"record_value_subject_name_strategy_compat: {}, "
"initial_retention_local_target_bytes: {}, "
"initial_retention_local_target_ms: {}, "
"mpx_virtual_cluster_id: {}}}",
"mpx_virtual_cluster_id: {}, ",
"write_caching: {}, ",
"flush_ms: {}, "
"flush_bytes: {}}}",
properties.compression,
properties.cleanup_policy_bitflags,
properties.compaction_strategy,
Expand Down Expand Up @@ -382,7 +390,10 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {
properties.record_value_subject_name_strategy_compat,
properties.initial_retention_local_target_bytes,
properties.initial_retention_local_target_ms,
properties.mpx_virtual_cluster_id);
properties.mpx_virtual_cluster_id,
properties.write_caching,
properties.flush_ms,
properties.flush_bytes);

return o;
}
Expand Down Expand Up @@ -654,7 +665,8 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) {
"record_value_subject_name_strategy: {}"
"record_value_subject_name_strategy_compat: {}, "
"initial_retention_local_target_bytes: {}, "
"initial_retention_local_target_ms: {}",
"initial_retention_local_target_ms: {}, write_caching: {}, flush_ms: {}, "
"flush_bytes: {}",
i.compression,
i.cleanup_policy_bitflags,
i.compaction_strategy,
Expand All @@ -677,7 +689,10 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) {
i.record_value_subject_name_strategy,
i.record_value_subject_name_strategy_compat,
i.initial_retention_local_target_bytes,
i.initial_retention_local_target_ms);
i.initial_retention_local_target_ms,
i.write_caching,
i.flush_ms,
i.flush_bytes);
return o;
}

Expand Down Expand Up @@ -1811,7 +1826,10 @@ void adl<cluster::incremental_topic_updates>::to(
t.record_value_subject_name_strategy,
t.record_value_subject_name_strategy_compat,
t.initial_retention_local_target_bytes,
t.initial_retention_local_target_ms);
t.initial_retention_local_target_ms,
t.write_caching,
t.flush_ms,
t.flush_bytes);
}

cluster::incremental_topic_updates
Expand Down Expand Up @@ -1933,6 +1951,19 @@ adl<cluster::incremental_topic_updates>::from(iobuf_parser& in) {
.from(in);
}

if (
version
<= cluster::incremental_topic_updates::version_with_write_caching) {
updates.write_caching = adl<cluster::property_update<
std::optional<model::write_caching_mode>>>{}
.from(in);
updates.flush_ms = adl<cluster::property_update<
std::optional<std::chrono::milliseconds>>>{}
.from(in);
updates.flush_bytes
= adl<cluster::property_update<std::optional<size_t>>>{}.from(in);
}

return updates;
}

Expand Down Expand Up @@ -2198,6 +2229,9 @@ adl<cluster::topic_properties>::from(iobuf_parser& parser) {
std::nullopt,
tristate<size_t>{std::nullopt},
tristate<std::chrono::milliseconds>{std::nullopt},
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt};
}

Expand Down
44 changes: 35 additions & 9 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -1473,7 +1473,7 @@ struct remote_topic_properties
*/
struct topic_properties
: serde::
envelope<topic_properties, serde::version<7>, serde::compat_version<0>> {
envelope<topic_properties, serde::version<8>, serde::compat_version<0>> {
topic_properties() noexcept = default;
topic_properties(
std::optional<model::compression> compression,
Expand Down Expand Up @@ -1507,7 +1507,10 @@ struct topic_properties
record_value_subject_name_strategy_compat,
tristate<size_t> initial_retention_local_target_bytes,
tristate<std::chrono::milliseconds> initial_retention_local_target_ms,
std::optional<model::vcluster_id> mpx_virtual_cluster_id)
std::optional<model::vcluster_id> mpx_virtual_cluster_id,
std::optional<model::write_caching_mode> write_caching,
std::optional<std::chrono::milliseconds> flush_ms,
std::optional<size_t> flush_bytes)
: compression(compression)
, cleanup_policy_bitflags(cleanup_policy_bitflags)
, compaction_strategy(compaction_strategy)
Expand Down Expand Up @@ -1540,7 +1543,10 @@ struct topic_properties
, initial_retention_local_target_bytes(
initial_retention_local_target_bytes)
, initial_retention_local_target_ms(initial_retention_local_target_ms)
, mpx_virtual_cluster_id(mpx_virtual_cluster_id) {}
, mpx_virtual_cluster_id(mpx_virtual_cluster_id)
, write_caching(write_caching)
, flush_ms(flush_ms)
, flush_bytes(flush_bytes) {}

std::optional<model::compression> compression;
std::optional<model::cleanup_policy_bitflags> cleanup_policy_bitflags;
Expand Down Expand Up @@ -1583,6 +1589,9 @@ struct topic_properties
tristate<std::chrono::milliseconds> initial_retention_local_target_ms{
std::nullopt};
std::optional<model::vcluster_id> mpx_virtual_cluster_id;
std::optional<model::write_caching_mode> write_caching;
std::optional<std::chrono::milliseconds> flush_ms;
std::optional<size_t> flush_bytes;

bool is_compacted() const;
bool has_overrides() const;
Expand Down Expand Up @@ -1620,7 +1629,10 @@ struct topic_properties
record_value_subject_name_strategy_compat,
initial_retention_local_target_bytes,
initial_retention_local_target_ms,
mpx_virtual_cluster_id);
mpx_virtual_cluster_id,
write_caching,
flush_ms,
flush_bytes);
}

friend bool operator==(const topic_properties&, const topic_properties&)
Expand Down Expand Up @@ -1708,7 +1720,7 @@ struct property_update<tristate<T>>
struct incremental_topic_updates
: serde::envelope<
incremental_topic_updates,
serde::version<5>,
serde::version<6>,
serde::compat_version<0>> {
static constexpr int8_t version_with_data_policy = -1;
static constexpr int8_t version_with_shadow_indexing = -3;
Expand All @@ -1717,14 +1729,16 @@ struct incremental_topic_updates
static constexpr int8_t version_with_segment_ms = -5;
static constexpr int8_t version_with_schema_id_validation = -6;
static constexpr int8_t version_with_initial_retention = -7;
static constexpr int8_t version_with_write_caching = -8;
// negative version indicating different format:
// -1 - topic_updates with data_policy
// -2 - topic_updates without data_policy
// -3 - topic_updates with shadow_indexing
// -4 - topic update with batch_max_bytes and retention.local.target
// -6 - topic updates with schema id validation
// -7 - topic updates with initial retention
static constexpr int8_t version = version_with_initial_retention;
// -8 - write caching properties
static constexpr int8_t version = version_with_write_caching;
property_update<std::optional<model::compression>> compression;
property_update<std::optional<model::cleanup_policy_bitflags>>
cleanup_policy_bitflags;
Expand Down Expand Up @@ -1762,6 +1776,9 @@ struct incremental_topic_updates
property_update<tristate<size_t>> initial_retention_local_target_bytes;
property_update<tristate<std::chrono::milliseconds>>
initial_retention_local_target_ms;
property_update<std::optional<model::write_caching_mode>> write_caching;
property_update<std::optional<std::chrono::milliseconds>> flush_ms;
property_update<std::optional<size_t>> flush_bytes;

auto serde_fields() {
return std::tie(
Expand All @@ -1787,7 +1804,10 @@ struct incremental_topic_updates
record_value_subject_name_strategy,
record_value_subject_name_strategy_compat,
initial_retention_local_target_bytes,
initial_retention_local_target_ms);
initial_retention_local_target_ms,
write_caching,
flush_ms,
flush_bytes);
}

friend std::ostream&
Expand Down Expand Up @@ -3876,7 +3896,7 @@ struct partition_stm_state
struct partition_raft_state
: serde::envelope<
partition_raft_state,
serde::version<2>,
serde::version<3>,
serde::compat_version<0>> {
using rpc_adl_exempt = std::true_type;

Expand All @@ -3898,6 +3918,9 @@ struct partition_raft_state
bool is_leader;
bool is_elected_leader;
std::vector<partition_stm_state> stms;
bool write_caching_enabled;
size_t flush_bytes;
std::chrono::milliseconds flush_ms;

struct follower_state
: serde::envelope<
Expand Down Expand Up @@ -3986,7 +4009,10 @@ struct partition_raft_state
is_elected_leader,
followers,
stms,
recovery_state);
recovery_state,
write_caching_enabled,
flush_bytes,
flush_ms);
}

friend bool
Expand Down
19 changes: 19 additions & 0 deletions src/v/compat/cluster_compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,10 @@ struct compat_check<cluster::topic_properties> {
json_write(initial_retention_local_target_bytes);
json_write(initial_retention_local_target_ms);
json_write(mpx_virtual_cluster_id);
json::write_exceptional_member_type(
wr, "write_caching", obj.write_caching);
json_write(flush_ms);
json_write(flush_bytes);
}

static cluster::topic_properties from_json(json::Value& rd) {
Expand Down Expand Up @@ -383,6 +387,9 @@ struct compat_check<cluster::topic_properties> {
json_read(initial_retention_local_target_bytes);
json_read(initial_retention_local_target_ms);
json_read(mpx_virtual_cluster_id);
json_read(write_caching);
json_read(flush_ms);
json_read(flush_bytes);
return obj;
}

Expand Down Expand Up @@ -410,6 +417,9 @@ struct compat_check<cluster::topic_properties> {
obj.initial_retention_local_target_ms
= tristate<std::chrono::milliseconds>{std::nullopt};
obj.mpx_virtual_cluster_id = std::nullopt;
obj.write_caching = std::nullopt;
obj.flush_bytes = std::nullopt;
obj.flush_ms = std::nullopt;

if (reply != obj) {
throw compat_error(fmt::format(
Expand Down Expand Up @@ -484,6 +494,9 @@ struct compat_check<cluster::topic_configuration> {
std::nullopt};
obj.properties.initial_retention_local_target_ms
= tristate<std::chrono::milliseconds>{std::nullopt};
obj.properties.write_caching = std::nullopt;
obj.properties.flush_bytes = std::nullopt;
obj.properties.flush_ms = std::nullopt;

obj.properties.mpx_virtual_cluster_id = std::nullopt;

Expand Down Expand Up @@ -550,6 +563,9 @@ struct compat_check<cluster::create_topics_request> {
topic.properties.initial_retention_local_target_ms
= tristate<std::chrono::milliseconds>{std::nullopt};
topic.properties.mpx_virtual_cluster_id = std::nullopt;
topic.properties.write_caching = std::nullopt;
topic.properties.flush_bytes = std::nullopt;
topic.properties.flush_ms = std::nullopt;
}
if (req != obj) {
throw compat_error(fmt::format(
Expand Down Expand Up @@ -613,6 +629,9 @@ struct compat_check<cluster::create_topics_reply> {
topic.properties.initial_retention_local_target_ms
= tristate<std::chrono::milliseconds>{std::nullopt};
topic.properties.mpx_virtual_cluster_id = std::nullopt;
topic.properties.write_caching = std::nullopt;
topic.properties.flush_bytes = std::nullopt;
topic.properties.flush_ms = std::nullopt;
}
if (reply != obj) {
throw compat_error(fmt::format(
Expand Down
11 changes: 10 additions & 1 deletion src/v/compat/cluster_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,16 @@ struct instance_generator<cluster::topic_properties> {
[] { return random_generators::get_int<size_t>(); }),
tests::random_tristate([] { return tests::random_duration_ms(); }),
tests::random_optional(
[] { return model::vcluster_id(random_xid()); })};
[] { return model::vcluster_id(random_xid()); }),
tests::random_optional([] {
return random_generators::random_choice(
{model::write_caching_mode::on,
model::write_caching_mode::off,
model::write_caching_mode::disabled});
}),
tests::random_optional([] { return tests::random_duration_ms(); }),
tests::random_optional(
[] { return random_generators::get_int<size_t>(); })};
}

static std::vector<cluster::topic_properties> limits() { return {}; }
Expand Down
Loading

0 comments on commit 60384c8

Please sign in to comment.