From 10d5ee08a0c50bd138a9ffcfd3d473d0790ee48f Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 24 Mar 2021 08:22:52 +0100 Subject: [PATCH 01/11] r/stm: exposed insert_linearizable_barrier method Exposed `insert_linearizable_barrier` from `raft::state_machine`. Signed-off-by: Michal Maslanka --- src/v/raft/state_machine.cc | 8 ++++++++ src/v/raft/state_machine.h | 10 ++++++++++ 2 files changed, 18 insertions(+) diff --git a/src/v/raft/state_machine.cc b/src/v/raft/state_machine.cc index 7d9d8a39d20e0..aa7cba544e4b1 100644 --- a/src/v/raft/state_machine.cc +++ b/src/v/raft/state_machine.cc @@ -129,4 +129,12 @@ ss::future<> state_machine::write_last_applied(model::offset o) { return _raft->write_last_applied(o); } +ss::future> state_machine::instert_linerizable_barrier( + model::timeout_clock::time_point timeout) { + return ss::with_timeout(timeout, _raft->linearizable_barrier()) + .handle_exception_type([](const ss::timed_out_error&) { + return result(errc::timeout); + }); +} + } // namespace raft diff --git a/src/v/raft/state_machine.h b/src/v/raft/state_machine.h index 2c7ac912fb3f2..d9a52b69d15de 100644 --- a/src/v/raft/state_machine.h +++ b/src/v/raft/state_machine.h @@ -88,6 +88,16 @@ class state_machine { ss::future> quorum_write_empty_batch(model::timeout_clock::time_point); + /** + * Sends a round of heartbeats to followers, when majority of followers + * replied with success to either this of any following request all reads up + * to returned offsets are linearizable. (i.e. majority of followers have + * updated their commit indices to at least reaturned offset). For more + * details see paragraph 6.4 of Raft protocol dissertation. + */ + ss::future> + instert_linerizable_barrier(model::timeout_clock::time_point); + virtual ~state_machine() {} protected: From 86d14853f080ba283209100a890a27f4d0ba0173 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 24 Mar 2021 11:57:44 +0100 Subject: [PATCH 02/11] k/incremental_alter_configs: handle lexical cast exception Added graceful handling of lexical cast exception thrown when parsing topic configuration property values. Signed-off-by: Michal Maslanka --- .../handlers/incremental_alter_configs.cc | 83 +++++++++++-------- 1 file changed, 47 insertions(+), 36 deletions(-) diff --git a/src/v/kafka/server/handlers/incremental_alter_configs.cc b/src/v/kafka/server/handlers/incremental_alter_configs.cc index fd75cfd8a764b..c4455685f76ab 100644 --- a/src/v/kafka/server/handlers/incremental_alter_configs.cc +++ b/src/v/kafka/server/handlers/incremental_alter_configs.cc @@ -31,6 +31,10 @@ namespace kafka { using req_resource_t = incremental_alter_configs_resource; using resp_resource_t = incremental_alter_configs_resource_response; +/** + * We pass returned value as a paramter to allow template to be automatically + * resolved. + */ template void parse_and_set_optional( cluster::property_update>& property, @@ -134,43 +138,50 @@ create_topic_properties_update(incremental_alter_configs_resource& resource) { // error case return *err; } - - if (cfg.name == topic_property_cleanup_policy) { - parse_and_set_optional( - update.properties.cleanup_policy_bitflags, cfg.value, op); - continue; - } - if (cfg.name == topic_property_compaction_strategy) { - parse_and_set_optional( - update.properties.compaction_strategy, cfg.value, op); - continue; - } - if (cfg.name == topic_property_compression) { - parse_and_set_optional( - update.properties.compression, cfg.value, op); - continue; + try { + if (cfg.name == topic_property_cleanup_policy) { + parse_and_set_optional( + update.properties.cleanup_policy_bitflags, cfg.value, op); + continue; + } + if (cfg.name == topic_property_compaction_strategy) { + parse_and_set_optional( + update.properties.compaction_strategy, cfg.value, op); + continue; + } + if (cfg.name == topic_property_compression) { + parse_and_set_optional( + update.properties.compression, cfg.value, op); + continue; + } + if (cfg.name == topic_property_segment_size) { + parse_and_set_optional( + update.properties.segment_size, cfg.value, op); + continue; + } + if (cfg.name == topic_property_timestamp_type) { + parse_and_set_optional( + update.properties.timestamp_type, cfg.value, op); + continue; + } + if (cfg.name == topic_property_retention_bytes) { + parse_and_set_tristate( + update.properties.retention_bytes, cfg.value, op); + continue; + } + if (cfg.name == topic_property_retention_duration) { + parse_and_set_tristate( + update.properties.retention_duration, cfg.value, op); + continue; + } + } catch (const boost::bad_lexical_cast& e) { + return make_error_alter_config_resource_response< + incremental_alter_configs_resource_response>( + resource, + error_code::invalid_config, + fmt::format( + "unable to parse property {} value {}", cfg.name, cfg.value)); } - if (cfg.name == topic_property_segment_size) { - parse_and_set_optional( - update.properties.segment_size, cfg.value, op); - continue; - } - if (cfg.name == topic_property_timestamp_type) { - parse_and_set_optional( - update.properties.timestamp_type, cfg.value, op); - continue; - } - if (cfg.name == topic_property_retention_bytes) { - parse_and_set_tristate( - update.properties.retention_bytes, cfg.value, op); - continue; - } - if (cfg.name == topic_property_retention_duration) { - parse_and_set_tristate( - update.properties.retention_duration, cfg.value, op); - continue; - } - // Unsupported property, return error return make_error_alter_config_resource_response( resource, From fb50c60775f90f01453217d31fbac67d6952478d Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 24 Mar 2021 11:59:14 +0100 Subject: [PATCH 03/11] k/configs: fixed passing error code to response Fixed passing error code returned by `topics_frontend` to alter configuration responses. Signed-off-by: Michal Maslanka --- src/v/kafka/server/handlers/configs/config_utils.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v/kafka/server/handlers/configs/config_utils.h b/src/v/kafka/server/handlers/configs/config_utils.h index bf10fadc67de8..12ef1e071c676 100644 --- a/src/v/kafka/server/handlers/configs/config_utils.h +++ b/src/v/kafka/server/handlers/configs/config_utils.h @@ -118,7 +118,7 @@ ss::future> do_alter_topics_configuration( + config::shard_local_cfg().alter_topic_cfg_timeout_ms()); for (auto& res : update_results) { responses.push_back(R{ - .error_code = error_code::none, + .error_code = map_topic_error_code(res.ec), .resource_type = static_cast(config_resource_type::topic), .resource_name = res.tp_ns.tp(), }); From 4949625da4a92b33ce8a7c1c996b9d84e2649035 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 24 Mar 2021 12:01:04 +0100 Subject: [PATCH 04/11] c/service: implemented update_topic_properties rpc endpoint Added `update_topic_properties` RPC method. This is requires since both `AlterConfigs` and `IncrementalAlterConfigs` requests can be called on any broker. Since we replicate topic properties update to controller it is required to forward the update topic properties to leader controller. Signed-off-by: Michal Maslanka --- src/v/cluster/controller.json | 5 +++++ src/v/cluster/service.cc | 19 +++++++++++++++++++ src/v/cluster/service.h | 6 ++++++ src/v/cluster/types.cc | 14 ++++++++++++++ src/v/cluster/types.h | 14 ++++++++++++++ 5 files changed, 58 insertions(+) diff --git a/src/v/cluster/controller.json b/src/v/cluster/controller.json index e9c35fbe4b942..1e6fafca04291 100644 --- a/src/v/cluster/controller.json +++ b/src/v/cluster/controller.json @@ -24,6 +24,11 @@ "name": "finish_partition_update", "input_type": "finish_partition_update_request", "output_type": "finish_partition_update_reply" + }, + { + "name": "update_topic_properties", + "input_type": "update_topic_properties_request", + "output_type": "update_topic_properties_reply" } ] } diff --git a/src/v/cluster/service.cc b/src/v/cluster/service.cc index 8e25ff2937c30..26c94f6f20949 100644 --- a/src/v/cluster/service.cc +++ b/src/v/cluster/service.cc @@ -129,4 +129,23 @@ service::do_finish_partition_update(finish_partition_update_request&& req) { co_return finish_partition_update_reply{.result = e}; } +ss::future service::update_topic_properties( + update_topic_properties_request&& req, rpc::streaming_context&) { + return ss::with_scheduling_group( + get_scheduling_group(), [this, req = std::move(req)]() mutable { + return do_update_topic_properties(std::move(req)); + }); +} + +ss::future +service::do_update_topic_properties(update_topic_properties_request&& req) { + // local topic frontend instance will eventually dispatch request to _raft0 + // core + auto res = co_await _topics_frontend.local().update_topic_properties( + req.updates, + config::shard_local_cfg().replicate_append_timeout_ms() + + model::timeout_clock::now()); + + co_return update_topic_properties_reply{.results = std::move(res)}; +} } // namespace cluster diff --git a/src/v/cluster/service.h b/src/v/cluster/service.h index 6a9fa3bbebfa0..1768fe46cebc1 100644 --- a/src/v/cluster/service.h +++ b/src/v/cluster/service.h @@ -43,6 +43,9 @@ class service : public controller_service { ss::future finish_partition_update( finish_partition_update_request&&, rpc::streaming_context&) final; + ss::future update_topic_properties( + update_topic_properties_request&&, rpc::streaming_context&) final; + private: std:: pair, std::vector> @@ -51,6 +54,9 @@ class service : public controller_service { ss::future do_finish_partition_update(finish_partition_update_request&&); + ss::future + do_update_topic_properties(update_topic_properties_request&&); + ss::sharded& _topics_frontend; ss::sharded& _members_manager; ss::sharded& _md_cache; diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index 3ec25d4a6f569..60d65c1d8dcff 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -322,4 +322,18 @@ adl::from(iobuf_parser& parser) { return ret; } + +void adl::to( + iobuf& out, cluster::topic_properties_update&& r) { + reflection::serialize(out, r.tp_ns, r.properties); +} + +cluster::topic_properties_update +adl::from(iobuf_parser& parser) { + auto tp_ns = adl{}.from(parser); + cluster::topic_properties_update ret(std::move(tp_ns)); + ret.properties = adl{}.from(parser); + + return ret; +} } // namespace reflection diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index d724cd8d38001..0600373eff3ab 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -208,6 +208,14 @@ struct finish_partition_update_reply { cluster::errc result; }; +struct update_topic_properties_request { + std::vector updates; +}; + +struct update_topic_properties_reply { + std::vector results; +}; + template struct patch { std::vector additions; @@ -317,4 +325,10 @@ struct adl { cluster::configuration_invariants from(iobuf_parser&); }; +template<> +struct adl { + void to(iobuf&, cluster::topic_properties_update&&); + cluster::topic_properties_update from(iobuf_parser&); +}; + } // namespace reflection From 182a0455118d0231a35ffee5faffd0aff64d93f2 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 24 Mar 2021 12:05:24 +0100 Subject: [PATCH 05/11] c/topics_frontend: dispatching update_topic_properties to leader Implemented dispatching topic properties update to leader controller. Both `AlterConfigs` and `IncrementalAlterConfigs` can be executed on any of the nodes. We have to forward topic properties update request to current leader controller. Signed-off-by: Michal Maslanka --- src/v/cluster/topics_frontend.cc | 65 +++++++++++++++++++++++--------- 1 file changed, 48 insertions(+), 17 deletions(-) diff --git a/src/v/cluster/topics_frontend.cc b/src/v/cluster/topics_frontend.cc index 79d9c4268c5f6..01a96a79190d6 100644 --- a/src/v/cluster/topics_frontend.cc +++ b/src/v/cluster/topics_frontend.cc @@ -80,23 +80,6 @@ ss::future> topics_frontend::create_topics( }); } -ss::future> topics_frontend::update_topic_properties( - std::vector updates, - model::timeout_clock::time_point timeout) { - auto result = co_await _stm.invoke_on( - controller_stm_shard, [timeout](controller_stm& stm) { - return stm.quorum_write_empty_batch(timeout); - }); - - if (!result) { - co_return create_topic_results(updates, errc::not_leader_controller); - } - co_return co_await ssx::parallel_transform( - std::move(updates), [this, timeout](topic_properties_update update) { - return do_update_topic_properties(std::move(update), timeout); - }); -} - cluster::errc map_errc(std::error_code ec) { if (ec == errc::success) { return errc::success; @@ -130,6 +113,54 @@ cluster::errc map_errc(std::error_code ec) { return errc::replication_error; } +ss::future> topics_frontend::update_topic_properties( + std::vector updates, + model::timeout_clock::time_point timeout) { + auto cluster_leader = _leaders.local().get_leader(model::controller_ntp); + + // no leader available + if (!cluster_leader) { + co_return create_topic_results(updates, errc::no_leader_controller); + } + + // current node is a leader, just replicate + if (cluster_leader == _self) { + // replicate empty batch to make sure leader local state is up to date. + auto result = co_await _stm.invoke_on( + controller_stm_shard, [timeout](controller_stm& stm) { + return stm.quorum_write_empty_batch(timeout); + }); + if (!result) { + co_return create_topic_results(updates, map_errc(result.error())); + } + + co_return co_await ssx::parallel_transform( + std::move(updates), [this, timeout](topic_properties_update update) { + return do_update_topic_properties(std::move(update), timeout); + }); + } + + co_return co_await _connections.local() + .with_node_client( + _self, + ss::this_shard_id(), + *cluster_leader, + timeout, + [updates, timeout](controller_client_protocol client) mutable { + return client + .update_topic_properties( + update_topic_properties_request{.updates = std::move(updates)}, + rpc::client_opts(timeout)) + .then(&rpc::get_ctx_data); + }) + .then([updates](result r) { + if (r.has_error()) { + return create_topic_results(updates, map_errc(r.error())); + } + return std::move(r.value().results); + }); +} + ss::future topics_frontend::do_update_topic_properties( topic_properties_update update, model::timeout_clock::time_point timeout) { update_topic_properties_cmd cmd(update.tp_ns, update.properties); From a6a672622d2cf43c0a842a5de970cc95a52a3c42 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 24 Mar 2021 12:13:37 +0100 Subject: [PATCH 06/11] c/topics_frontend: leveraging linearizable barrier for topic operations Leveraging raft linearizable barrier to speed up applying commands on followers. In Kafka Metadata are eventually consistent. Our metadata cache and controller design leverages that fact (we do not require linearizable reads when requesting metadata). In raft followers do not know about committed index updates until then receive heartbeat following commit index update on the leader. In redpanda heartbeats are by default send every 150ms. This way follower metadata is updated with a delay of ~150ms. The delay of metadata propagation together with the fact that clients requests metadata update from arbitrary broker in the cluster may lead to situation in which clients will receive stale metadata. This is correct from the protocol perspective but it may be misleading and may cause additional delays when client backoff timeout is triggered. In order to reduce metadata propagation lag we will use `raft::insert_linearizable_barrier` method. This way we will trigger round of heartbeats immediately after data were replicated. This way we will reduce delay of metadata propagation. Signed-off-by: Michal Maslanka --- src/v/cluster/topics_frontend.cc | 49 ++++++++++++++++++++++++++++++-- src/v/cluster/topics_frontend.h | 4 +++ 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/src/v/cluster/topics_frontend.cc b/src/v/cluster/topics_frontend.cc index 01a96a79190d6..1cd65b54d786c 100644 --- a/src/v/cluster/topics_frontend.cc +++ b/src/v/cluster/topics_frontend.cc @@ -47,6 +47,14 @@ topics_frontend::topics_frontend( , _leaders(l) , _as(as) {} +static bool +needs_linearizable_barrier(const std::vector& results) { + return std::any_of( + results.cbegin(), results.cend(), [](const topic_result& r) { + return r.ec == errc::success; + }); +} + ss::future> topics_frontend::create_topics( std::vector topics, model::timeout_clock::time_point timeout) { @@ -77,6 +85,16 @@ ss::future> topics_frontend::create_topics( }); return ss::when_all_succeed(futures.begin(), futures.end()); + }) + .then([this, timeout](std::vector results) { + if (needs_linearizable_barrier(results)) { + return stm_linearizable_barrier(timeout).then( + [results = std::move(results)](result) mutable { + return results; + }); + } + return ss::make_ready_future>( + std::move(results)); }); } @@ -134,10 +152,20 @@ ss::future> topics_frontend::update_topic_properties( co_return create_topic_results(updates, map_errc(result.error())); } - co_return co_await ssx::parallel_transform( + auto results = co_await ssx::parallel_transform( std::move(updates), [this, timeout](topic_properties_update update) { return do_update_topic_properties(std::move(update), timeout); }); + + // we are not really interested in the result comming from the + // linearizable barrier, results comming from the previous steps will be + // propagated to clients, this is just an optimization, this doesn't + // affect correctness of the protocol + if (needs_linearizable_barrier(results)) { + co_await stm_linearizable_barrier(timeout).discard_result(); + } + + co_return results; } co_return co_await _connections.local() @@ -297,7 +325,17 @@ ss::future> topics_frontend::delete_topics( return do_delete_topic(std::move(tp_ns), timeout); }); - return ss::when_all_succeed(futures.begin(), futures.end()); + return ss::when_all_succeed(futures.begin(), futures.end()) + .then([this, timeout](std::vector results) { + if (needs_linearizable_barrier(results)) { + return stm_linearizable_barrier(timeout).then( + [results = std::move(results)](result) mutable { + return results; + }); + } + return ss::make_ready_future>( + std::move(results)); + }); } ss::future topics_frontend::do_delete_topic( @@ -432,4 +470,11 @@ ss::future topics_frontend::finish_moving_partition_replicas( }); } +ss::future> topics_frontend::stm_linearizable_barrier( + model::timeout_clock::time_point timeout) { + return _stm.invoke_on(controller_stm_shard, [timeout](controller_stm& stm) { + return stm.instert_linerizable_barrier(timeout); + }); +} + } // namespace cluster diff --git a/src/v/cluster/topics_frontend.h b/src/v/cluster/topics_frontend.h index b9d0f55fcaa62..a4409f542fcd9 100644 --- a/src/v/cluster/topics_frontend.h +++ b/src/v/cluster/topics_frontend.h @@ -84,6 +84,10 @@ class topics_frontend { ss::future do_update_topic_properties( topic_properties_update, model::timeout_clock::time_point); ss::future<> update_leaders_with_estimates(std::vector); + + ss::future> + stm_linearizable_barrier(model::timeout_clock::time_point); + // returns true if the topic name is valid static bool validate_topic_name(const model::topic_namespace&); From f8d1f939042f3916f5940adbe5644dfd4fa7222e Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 24 Mar 2021 13:13:40 +0100 Subject: [PATCH 07/11] k/describe_configs: fixed printing property names We have to use standard naming of topic properties. Replaced custom string with predefined constants of all topic property names. Signed-off-by: Michal Maslanka --- src/v/kafka/server/handlers/describe_configs.cc | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/v/kafka/server/handlers/describe_configs.cc b/src/v/kafka/server/handlers/describe_configs.cc index 741e778dbc98a..81a565a56a580 100644 --- a/src/v/kafka/server/handlers/describe_configs.cc +++ b/src/v/kafka/server/handlers/describe_configs.cc @@ -13,6 +13,7 @@ #include "config/configuration.h" #include "kafka/protocol/errors.h" #include "kafka/server/handlers/topics/topic_utils.h" +#include "kafka/server/handlers/topics/types.h" #include "kafka/server/request_context.h" #include "kafka/server/response.h" #include "model/fundamental.h" @@ -35,11 +36,11 @@ namespace kafka { template static void add_config( describe_configs_result& result, - const char* name, + std::string_view name, T value, describe_configs_source source) { result.configs.push_back(describe_configs_resource_result{ - .name = name, + .name = ss::sstring(name), .value = fmt::format("{}", value), .config_source = source, }); @@ -193,7 +194,7 @@ ss::future describe_configs_handler::handle( add_config( result, - "cleanup.policy", + topic_property_cleanup_policy, describe_topic_cleanup_policy( topic_config, ctx.metadata_cache().get_default_cleanup_policy_bitflags()), @@ -201,14 +202,14 @@ ss::future describe_configs_handler::handle( add_config( result, - "compression.type", + topic_property_compression, topic_config->properties.compression.value_or( ctx.metadata_cache().get_default_compression()), describe_configs_source::topic); add_config( result, - "segment.bytes", + topic_property_segment_size, topic_config->properties.segment_size.value_or( topic_config->properties.is_compacted() ? ctx.metadata_cache() @@ -218,7 +219,7 @@ ss::future describe_configs_handler::handle( add_config( result, - "retention.ms", + topic_property_retention_duration, describe_retention_duration( topic_config->properties.retention_duration, ctx.metadata_cache().get_default_retention_duration()), @@ -226,7 +227,7 @@ ss::future describe_configs_handler::handle( add_config( result, - "retention.bytes", + topic_property_retention_bytes, describe_retention_bytes( topic_config->properties.retention_bytes, ctx.metadata_cache().get_default_retention_bytes()), @@ -234,7 +235,7 @@ ss::future describe_configs_handler::handle( add_config( result, - "timestamp.type", + topic_property_timestamp_type, topic_config->properties.timestamp_type.value_or( ctx.metadata_cache().get_default_timestamp_type()), describe_configs_source::topic); From 0a6d879f827dabe073b9e9f3f63de43a5dddbcb0 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 24 Mar 2021 13:15:32 +0100 Subject: [PATCH 08/11] m/model: fixed printing timestamp type We have to make sure that all our types are presented to clients in Kafka way. Fixed printing out `model::timestamp_type`. Signed-off-by: Michal Maslanka --- src/v/model/model.cc | 8 ++++++-- src/v/model/tests/CMakeLists.txt | 2 +- ...exical_cast_test.cc => lexical_cast_tests.cc} | 16 ++++++++++++++++ 3 files changed, 23 insertions(+), 3 deletions(-) rename src/v/model/tests/{compression_lexical_cast_test.cc => lexical_cast_tests.cc} (82%) diff --git a/src/v/model/model.cc b/src/v/model/model.cc index 4d5279d2edab7..c287f47bb19ea 100644 --- a/src/v/model/model.cc +++ b/src/v/model/model.cc @@ -123,11 +123,15 @@ operator<<(std::ostream& o, const model::topic_namespace_view& tp_ns) { } std::ostream& operator<<(std::ostream& os, timestamp_type ts) { + /** + * We need to use specific string representations of timestamp_type as this + * is related with protocol correctness + */ switch (ts) { case timestamp_type::append_time: - return os << "append_time"; + return os << "LogAppendTime"; case timestamp_type::create_time: - return os << "create_time"; + return os << "CreateTime"; } return os << "{unknown timestamp:" << static_cast(ts) << "}"; } diff --git a/src/v/model/tests/CMakeLists.txt b/src/v/model/tests/CMakeLists.txt index 4447effc13646..64d87dc2073d5 100644 --- a/src/v/model/tests/CMakeLists.txt +++ b/src/v/model/tests/CMakeLists.txt @@ -2,7 +2,7 @@ rp_test( UNIT_TEST BINARY_NAME model SOURCES - compression_lexical_cast_test.cc + lexical_cast_tests.cc ntp_path_test.cc topic_view_tests.cc DEFINITIONS BOOST_TEST_DYN_LINK diff --git a/src/v/model/tests/compression_lexical_cast_test.cc b/src/v/model/tests/lexical_cast_tests.cc similarity index 82% rename from src/v/model/tests/compression_lexical_cast_test.cc rename to src/v/model/tests/lexical_cast_tests.cc index b866b5077629e..af5057487f722 100644 --- a/src/v/model/tests/compression_lexical_cast_test.cc +++ b/src/v/model/tests/lexical_cast_tests.cc @@ -69,3 +69,19 @@ BOOST_AUTO_TEST_CASE(removing_compression) { attr.remove_compression(); BOOST_CHECK_EQUAL(attr.compression(), model::compression::none); }; + +BOOST_AUTO_TEST_CASE(timestamp_type_cast_from_string) { + BOOST_CHECK_EQUAL( + boost::lexical_cast("CreateTime"), + model::timestamp_type::create_time); + BOOST_CHECK_EQUAL( + boost::lexical_cast("LogAppendTime"), + model::timestamp_type::append_time); +}; + +BOOST_AUTO_TEST_CASE(timestamp_type_printing) { + BOOST_CHECK_EQUAL( + "CreateTime", fmt::format("{}", model::timestamp_type::create_time)); + BOOST_CHECK_EQUAL( + "LogAppendTime", fmt::format("{}", model::timestamp_type::append_time)); +}; From d3031f797ac6a4cc1fbfd3faf644cce6c1a11d6b Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 24 Mar 2021 14:10:34 +0100 Subject: [PATCH 09/11] tests/ducky: added missing topic properties to topic spec Signed-off-by: Michal Maslanka --- tests/rptest/clients/kafka_cli_tools.py | 9 ++--- tests/rptest/clients/types.py | 44 +++++++++++++++++++++---- 2 files changed, 43 insertions(+), 10 deletions(-) diff --git a/tests/rptest/clients/kafka_cli_tools.py b/tests/rptest/clients/kafka_cli_tools.py index e10e235023cc2..ed13aa3dfdb86 100644 --- a/tests/rptest/clients/kafka_cli_tools.py +++ b/tests/rptest/clients/kafka_cli_tools.py @@ -90,14 +90,15 @@ def describe_topic(self, topic): configs = part[8:] def maybe_int(key, value): - if key in ["partition_count", "replication_factor"]: + if key in [ + "partition_count", "replication_factor", "retention_ms", + "retention_bytes", 'segment_bytes' + ]: value = int(value) return value def fix_key(key): - if key == "cleanup.policy": - return "cleanup_policy" - return key + return key.replace(".", "_") self._redpanda.logger.debug(f"Describe topics configs: {configs}") configs = [config.split("=") for config in configs.split(",")] diff --git a/tests/rptest/clients/types.py b/tests/rptest/clients/types.py index 590d401b25026..b48a7ba5d6932 100644 --- a/tests/rptest/clients/types.py +++ b/tests/rptest/clients/types.py @@ -20,16 +20,48 @@ class TopicSpec: CLEANUP_COMPACT = "compact" CLEANUP_DELETE = "delete" - def __init__(self, - *, - name=None, - partition_count=1, - replication_factor=3, - cleanup_policy=CLEANUP_DELETE): + PROPERTY_COMPRESSSION = "compression.type" + PROPERTY_CLEANUP_POLICY = "cleanup.policy" + PROPERTY_COMPACTION_STRATEGY = "compaction.strategy" + PROPERTY_TIMESTAMP_TYPE = "message.timestamp.type" + PROPERTY_SEGMENT_SIZE = "segment.bytes" + PROPERTY_RETENTION_BYTES = "retention.bytes" + PROPERTY_RETENTION_TIME = "retention.ms" + + # compression types + COMPRESSION_NONE = "none" + COMPRESSION_PRODUCER = "producer" + COMPRESSION_GZIP = "gzip" + COMPRESSION_LZ4 = "lz4" + COMPRESSION_SNAPPY = "snappy" + COMPRESSION_ZSTD = "zstd" + + # timestamp types + TIMESTAMP_CREATE_TIME = "CreateTime" + TIMESTAMP_LOG_APPEND_TIME = "LogAppendTime" + + def __init__( + self, + *, + name=None, + partition_count=1, + replication_factor=3, + cleanup_policy=CLEANUP_DELETE, + compression_type=COMPRESSION_PRODUCER, + message_timestamp_type=TIMESTAMP_CREATE_TIME, + segment_bytes=1 * (2 ^ 30), + retention_bytes=-1, + retention_ms=(7 * 24 * 3600 * 1000) # one week + ): self.name = name or f"topic-{self._random_topic_suffix()}" self.partition_count = partition_count self.replication_factor = replication_factor self.cleanup_policy = cleanup_policy + self.compression_type = compression_type + self.message_timestamp_type = message_timestamp_type + self.segment_bytes = segment_bytes + self.retention_bytes = retention_bytes + self.retention_ms = retention_ms def __str__(self): return self.name From e544bd90ee72efd20e61143d46a32debdfda95e7 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 24 Mar 2021 14:13:07 +0100 Subject: [PATCH 10/11] ducky/kafka_cli_tools: added alter_topic_properties method Signed-off-by: Michal Maslanka --- tests/rptest/clients/kafka_cli_tools.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/rptest/clients/kafka_cli_tools.py b/tests/rptest/clients/kafka_cli_tools.py index ed13aa3dfdb86..cf526734dd7ab 100644 --- a/tests/rptest/clients/kafka_cli_tools.py +++ b/tests/rptest/clients/kafka_cli_tools.py @@ -113,6 +113,17 @@ def describe_broker_config(self): self._redpanda.logger.debug("Describe brokers config result: %s", res) return res + def alter_topic_config(self, topic, configuration_map): + self._redpanda.logger.debug("Altering topic %s configuration with %s", + topic, configuration_map) + args = ["--topic", topic, "--alter"] + args.append("--add-config") + args.append(",".join( + map(lambda item: f"{item[0]}={item[1]}", + configuration_map.items()))) + + return self._run("kafka-configs.sh", args) + def produce(self, topic, num_records, From a58870b7337275af81ed3e91b4dccbfd21ae61af Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 24 Mar 2021 14:47:38 +0100 Subject: [PATCH 11/11] ducky: implemented tests for altering topic properties Signed-off-by: Michal Maslanka --- .../tests/alter_topic_configuration_test.py | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 tests/rptest/tests/alter_topic_configuration_test.py diff --git a/tests/rptest/tests/alter_topic_configuration_test.py b/tests/rptest/tests/alter_topic_configuration_test.py new file mode 100644 index 0000000000000..a3385d467da18 --- /dev/null +++ b/tests/rptest/tests/alter_topic_configuration_test.py @@ -0,0 +1,89 @@ +# Copyright 2020 Vectorized, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +import random +import string +import time + +from ducktape.mark.resource import cluster +from ducktape.mark import parametrize +from ducktape.tests.test import test_logger +from rptest.clients.kafka_cli_tools import KafkaCliTools + +from rptest.clients.types import TopicSpec +from rptest.tests.redpanda_test import RedpandaTest + + +class AlterTopicConfiguration(RedpandaTest): + """ + Change a partition's replica set. + """ + topics = (TopicSpec(partition_count=1, replication_factor=3), ) + log_level = 'trace' + + def __init__(self, test_context): + super(AlterTopicConfiguration, + self).__init__(test_context=test_context, + num_brokers=3, + topics=self.topics, + log_level='trace') + + self.kafka_tools = KafkaCliTools(self.redpanda) + + @cluster(num_nodes=3) + @parametrize(property=TopicSpec.PROPERTY_CLEANUP_POLICY, value="compact") + @parametrize(property=TopicSpec.PROPERTY_SEGMENT_SIZE, + value=10 * (2 << 20)) + @parametrize(property=TopicSpec.PROPERTY_RETENTION_BYTES, + value=200 * (2 << 20)) + @parametrize(property=TopicSpec.PROPERTY_RETENTION_TIME, value=360000) + @parametrize(property=TopicSpec.PROPERTY_TIMESTAMP_TYPE, + value="LogAppendTime") + def test_altering_topic_configuration(self, property, value): + topic = self.topics[0].name + kafka_tools = KafkaCliTools(self.redpanda) + res = kafka_tools.alter_topic_config(topic, {property: value}) + spec = kafka_tools.describe_topic(topic) + + @cluster(num_nodes=3) + def test_altering_multiple_topic_configurations(self): + topic = self.topics[0].name + kafka_tools = KafkaCliTools(self.redpanda) + res = kafka_tools.alter_topic_config( + topic, { + TopicSpec.PROPERTY_SEGMENT_SIZE: 1024, + TopicSpec.PROPERTY_RETENTION_TIME: 360000, + TopicSpec.PROPERTY_TIMESTAMP_TYPE: "LogAppendTime" + }) + spec = kafka_tools.describe_topic(topic) + + assert spec.segment_bytes == 1024 + assert spec.retention_ms == 360000 + assert spec.message_timestamp_type == "LogAppendTime" + + def random_string(self, size): + return ''.join( + random.choice(string.ascii_uppercase + string.digits) + for _ in range(size)) + + @cluster(num_nodes=3) + def test_configuration_properties_name_validation(self): + topic = self.topics[0].name + kafka_tools = KafkaCliTools(self.redpanda) + spec = kafka_tools.describe_topic(topic) + for i in range(0, 5): + key = self.random_string(5) + try: + res = kafka_tools.alter_topic_config(topic, {key: "123"}) + except Exception as inst: + test_logger.info("exception %s", inst) + + new_spec = kafka_tools.describe_topic(topic) + # topic spec shouldn't change + assert new_spec == spec