diff --git a/src/v/cluster/controller.json b/src/v/cluster/controller.json index e9c35fbe4b942..1e6fafca04291 100644 --- a/src/v/cluster/controller.json +++ b/src/v/cluster/controller.json @@ -24,6 +24,11 @@ "name": "finish_partition_update", "input_type": "finish_partition_update_request", "output_type": "finish_partition_update_reply" + }, + { + "name": "update_topic_properties", + "input_type": "update_topic_properties_request", + "output_type": "update_topic_properties_reply" } ] } diff --git a/src/v/cluster/service.cc b/src/v/cluster/service.cc index 8e25ff2937c30..26c94f6f20949 100644 --- a/src/v/cluster/service.cc +++ b/src/v/cluster/service.cc @@ -129,4 +129,23 @@ service::do_finish_partition_update(finish_partition_update_request&& req) { co_return finish_partition_update_reply{.result = e}; } +ss::future service::update_topic_properties( + update_topic_properties_request&& req, rpc::streaming_context&) { + return ss::with_scheduling_group( + get_scheduling_group(), [this, req = std::move(req)]() mutable { + return do_update_topic_properties(std::move(req)); + }); +} + +ss::future +service::do_update_topic_properties(update_topic_properties_request&& req) { + // local topic frontend instance will eventually dispatch request to _raft0 + // core + auto res = co_await _topics_frontend.local().update_topic_properties( + req.updates, + config::shard_local_cfg().replicate_append_timeout_ms() + + model::timeout_clock::now()); + + co_return update_topic_properties_reply{.results = std::move(res)}; +} } // namespace cluster diff --git a/src/v/cluster/service.h b/src/v/cluster/service.h index 6a9fa3bbebfa0..1768fe46cebc1 100644 --- a/src/v/cluster/service.h +++ b/src/v/cluster/service.h @@ -43,6 +43,9 @@ class service : public controller_service { ss::future finish_partition_update( finish_partition_update_request&&, rpc::streaming_context&) final; + ss::future update_topic_properties( + update_topic_properties_request&&, rpc::streaming_context&) final; + private: std:: pair, std::vector> @@ -51,6 +54,9 @@ class service : public controller_service { ss::future do_finish_partition_update(finish_partition_update_request&&); + ss::future + do_update_topic_properties(update_topic_properties_request&&); + ss::sharded& _topics_frontend; ss::sharded& _members_manager; ss::sharded& _md_cache; diff --git a/src/v/cluster/topics_frontend.cc b/src/v/cluster/topics_frontend.cc index 79d9c4268c5f6..1cd65b54d786c 100644 --- a/src/v/cluster/topics_frontend.cc +++ b/src/v/cluster/topics_frontend.cc @@ -47,6 +47,14 @@ topics_frontend::topics_frontend( , _leaders(l) , _as(as) {} +static bool +needs_linearizable_barrier(const std::vector& results) { + return std::any_of( + results.cbegin(), results.cend(), [](const topic_result& r) { + return r.ec == errc::success; + }); +} + ss::future> topics_frontend::create_topics( std::vector topics, model::timeout_clock::time_point timeout) { @@ -77,23 +85,16 @@ ss::future> topics_frontend::create_topics( }); return ss::when_all_succeed(futures.begin(), futures.end()); - }); -} - -ss::future> topics_frontend::update_topic_properties( - std::vector updates, - model::timeout_clock::time_point timeout) { - auto result = co_await _stm.invoke_on( - controller_stm_shard, [timeout](controller_stm& stm) { - return stm.quorum_write_empty_batch(timeout); - }); - - if (!result) { - co_return create_topic_results(updates, errc::not_leader_controller); - } - co_return co_await ssx::parallel_transform( - std::move(updates), [this, timeout](topic_properties_update update) { - return do_update_topic_properties(std::move(update), timeout); + }) + .then([this, timeout](std::vector results) { + if (needs_linearizable_barrier(results)) { + return stm_linearizable_barrier(timeout).then( + [results = std::move(results)](result) mutable { + return results; + }); + } + return ss::make_ready_future>( + std::move(results)); }); } @@ -130,6 +131,64 @@ cluster::errc map_errc(std::error_code ec) { return errc::replication_error; } +ss::future> topics_frontend::update_topic_properties( + std::vector updates, + model::timeout_clock::time_point timeout) { + auto cluster_leader = _leaders.local().get_leader(model::controller_ntp); + + // no leader available + if (!cluster_leader) { + co_return create_topic_results(updates, errc::no_leader_controller); + } + + // current node is a leader, just replicate + if (cluster_leader == _self) { + // replicate empty batch to make sure leader local state is up to date. + auto result = co_await _stm.invoke_on( + controller_stm_shard, [timeout](controller_stm& stm) { + return stm.quorum_write_empty_batch(timeout); + }); + if (!result) { + co_return create_topic_results(updates, map_errc(result.error())); + } + + auto results = co_await ssx::parallel_transform( + std::move(updates), [this, timeout](topic_properties_update update) { + return do_update_topic_properties(std::move(update), timeout); + }); + + // we are not really interested in the result comming from the + // linearizable barrier, results comming from the previous steps will be + // propagated to clients, this is just an optimization, this doesn't + // affect correctness of the protocol + if (needs_linearizable_barrier(results)) { + co_await stm_linearizable_barrier(timeout).discard_result(); + } + + co_return results; + } + + co_return co_await _connections.local() + .with_node_client( + _self, + ss::this_shard_id(), + *cluster_leader, + timeout, + [updates, timeout](controller_client_protocol client) mutable { + return client + .update_topic_properties( + update_topic_properties_request{.updates = std::move(updates)}, + rpc::client_opts(timeout)) + .then(&rpc::get_ctx_data); + }) + .then([updates](result r) { + if (r.has_error()) { + return create_topic_results(updates, map_errc(r.error())); + } + return std::move(r.value().results); + }); +} + ss::future topics_frontend::do_update_topic_properties( topic_properties_update update, model::timeout_clock::time_point timeout) { update_topic_properties_cmd cmd(update.tp_ns, update.properties); @@ -266,7 +325,17 @@ ss::future> topics_frontend::delete_topics( return do_delete_topic(std::move(tp_ns), timeout); }); - return ss::when_all_succeed(futures.begin(), futures.end()); + return ss::when_all_succeed(futures.begin(), futures.end()) + .then([this, timeout](std::vector results) { + if (needs_linearizable_barrier(results)) { + return stm_linearizable_barrier(timeout).then( + [results = std::move(results)](result) mutable { + return results; + }); + } + return ss::make_ready_future>( + std::move(results)); + }); } ss::future topics_frontend::do_delete_topic( @@ -401,4 +470,11 @@ ss::future topics_frontend::finish_moving_partition_replicas( }); } +ss::future> topics_frontend::stm_linearizable_barrier( + model::timeout_clock::time_point timeout) { + return _stm.invoke_on(controller_stm_shard, [timeout](controller_stm& stm) { + return stm.instert_linerizable_barrier(timeout); + }); +} + } // namespace cluster diff --git a/src/v/cluster/topics_frontend.h b/src/v/cluster/topics_frontend.h index b9d0f55fcaa62..a4409f542fcd9 100644 --- a/src/v/cluster/topics_frontend.h +++ b/src/v/cluster/topics_frontend.h @@ -84,6 +84,10 @@ class topics_frontend { ss::future do_update_topic_properties( topic_properties_update, model::timeout_clock::time_point); ss::future<> update_leaders_with_estimates(std::vector); + + ss::future> + stm_linearizable_barrier(model::timeout_clock::time_point); + // returns true if the topic name is valid static bool validate_topic_name(const model::topic_namespace&); diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index 3ec25d4a6f569..60d65c1d8dcff 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -322,4 +322,18 @@ adl::from(iobuf_parser& parser) { return ret; } + +void adl::to( + iobuf& out, cluster::topic_properties_update&& r) { + reflection::serialize(out, r.tp_ns, r.properties); +} + +cluster::topic_properties_update +adl::from(iobuf_parser& parser) { + auto tp_ns = adl{}.from(parser); + cluster::topic_properties_update ret(std::move(tp_ns)); + ret.properties = adl{}.from(parser); + + return ret; +} } // namespace reflection diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index d724cd8d38001..0600373eff3ab 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -208,6 +208,14 @@ struct finish_partition_update_reply { cluster::errc result; }; +struct update_topic_properties_request { + std::vector updates; +}; + +struct update_topic_properties_reply { + std::vector results; +}; + template struct patch { std::vector additions; @@ -317,4 +325,10 @@ struct adl { cluster::configuration_invariants from(iobuf_parser&); }; +template<> +struct adl { + void to(iobuf&, cluster::topic_properties_update&&); + cluster::topic_properties_update from(iobuf_parser&); +}; + } // namespace reflection diff --git a/src/v/kafka/server/handlers/configs/config_utils.h b/src/v/kafka/server/handlers/configs/config_utils.h index bf10fadc67de8..12ef1e071c676 100644 --- a/src/v/kafka/server/handlers/configs/config_utils.h +++ b/src/v/kafka/server/handlers/configs/config_utils.h @@ -118,7 +118,7 @@ ss::future> do_alter_topics_configuration( + config::shard_local_cfg().alter_topic_cfg_timeout_ms()); for (auto& res : update_results) { responses.push_back(R{ - .error_code = error_code::none, + .error_code = map_topic_error_code(res.ec), .resource_type = static_cast(config_resource_type::topic), .resource_name = res.tp_ns.tp(), }); diff --git a/src/v/kafka/server/handlers/describe_configs.cc b/src/v/kafka/server/handlers/describe_configs.cc index 741e778dbc98a..81a565a56a580 100644 --- a/src/v/kafka/server/handlers/describe_configs.cc +++ b/src/v/kafka/server/handlers/describe_configs.cc @@ -13,6 +13,7 @@ #include "config/configuration.h" #include "kafka/protocol/errors.h" #include "kafka/server/handlers/topics/topic_utils.h" +#include "kafka/server/handlers/topics/types.h" #include "kafka/server/request_context.h" #include "kafka/server/response.h" #include "model/fundamental.h" @@ -35,11 +36,11 @@ namespace kafka { template static void add_config( describe_configs_result& result, - const char* name, + std::string_view name, T value, describe_configs_source source) { result.configs.push_back(describe_configs_resource_result{ - .name = name, + .name = ss::sstring(name), .value = fmt::format("{}", value), .config_source = source, }); @@ -193,7 +194,7 @@ ss::future describe_configs_handler::handle( add_config( result, - "cleanup.policy", + topic_property_cleanup_policy, describe_topic_cleanup_policy( topic_config, ctx.metadata_cache().get_default_cleanup_policy_bitflags()), @@ -201,14 +202,14 @@ ss::future describe_configs_handler::handle( add_config( result, - "compression.type", + topic_property_compression, topic_config->properties.compression.value_or( ctx.metadata_cache().get_default_compression()), describe_configs_source::topic); add_config( result, - "segment.bytes", + topic_property_segment_size, topic_config->properties.segment_size.value_or( topic_config->properties.is_compacted() ? ctx.metadata_cache() @@ -218,7 +219,7 @@ ss::future describe_configs_handler::handle( add_config( result, - "retention.ms", + topic_property_retention_duration, describe_retention_duration( topic_config->properties.retention_duration, ctx.metadata_cache().get_default_retention_duration()), @@ -226,7 +227,7 @@ ss::future describe_configs_handler::handle( add_config( result, - "retention.bytes", + topic_property_retention_bytes, describe_retention_bytes( topic_config->properties.retention_bytes, ctx.metadata_cache().get_default_retention_bytes()), @@ -234,7 +235,7 @@ ss::future describe_configs_handler::handle( add_config( result, - "timestamp.type", + topic_property_timestamp_type, topic_config->properties.timestamp_type.value_or( ctx.metadata_cache().get_default_timestamp_type()), describe_configs_source::topic); diff --git a/src/v/kafka/server/handlers/incremental_alter_configs.cc b/src/v/kafka/server/handlers/incremental_alter_configs.cc index fd75cfd8a764b..c4455685f76ab 100644 --- a/src/v/kafka/server/handlers/incremental_alter_configs.cc +++ b/src/v/kafka/server/handlers/incremental_alter_configs.cc @@ -31,6 +31,10 @@ namespace kafka { using req_resource_t = incremental_alter_configs_resource; using resp_resource_t = incremental_alter_configs_resource_response; +/** + * We pass returned value as a paramter to allow template to be automatically + * resolved. + */ template void parse_and_set_optional( cluster::property_update>& property, @@ -134,43 +138,50 @@ create_topic_properties_update(incremental_alter_configs_resource& resource) { // error case return *err; } - - if (cfg.name == topic_property_cleanup_policy) { - parse_and_set_optional( - update.properties.cleanup_policy_bitflags, cfg.value, op); - continue; - } - if (cfg.name == topic_property_compaction_strategy) { - parse_and_set_optional( - update.properties.compaction_strategy, cfg.value, op); - continue; - } - if (cfg.name == topic_property_compression) { - parse_and_set_optional( - update.properties.compression, cfg.value, op); - continue; + try { + if (cfg.name == topic_property_cleanup_policy) { + parse_and_set_optional( + update.properties.cleanup_policy_bitflags, cfg.value, op); + continue; + } + if (cfg.name == topic_property_compaction_strategy) { + parse_and_set_optional( + update.properties.compaction_strategy, cfg.value, op); + continue; + } + if (cfg.name == topic_property_compression) { + parse_and_set_optional( + update.properties.compression, cfg.value, op); + continue; + } + if (cfg.name == topic_property_segment_size) { + parse_and_set_optional( + update.properties.segment_size, cfg.value, op); + continue; + } + if (cfg.name == topic_property_timestamp_type) { + parse_and_set_optional( + update.properties.timestamp_type, cfg.value, op); + continue; + } + if (cfg.name == topic_property_retention_bytes) { + parse_and_set_tristate( + update.properties.retention_bytes, cfg.value, op); + continue; + } + if (cfg.name == topic_property_retention_duration) { + parse_and_set_tristate( + update.properties.retention_duration, cfg.value, op); + continue; + } + } catch (const boost::bad_lexical_cast& e) { + return make_error_alter_config_resource_response< + incremental_alter_configs_resource_response>( + resource, + error_code::invalid_config, + fmt::format( + "unable to parse property {} value {}", cfg.name, cfg.value)); } - if (cfg.name == topic_property_segment_size) { - parse_and_set_optional( - update.properties.segment_size, cfg.value, op); - continue; - } - if (cfg.name == topic_property_timestamp_type) { - parse_and_set_optional( - update.properties.timestamp_type, cfg.value, op); - continue; - } - if (cfg.name == topic_property_retention_bytes) { - parse_and_set_tristate( - update.properties.retention_bytes, cfg.value, op); - continue; - } - if (cfg.name == topic_property_retention_duration) { - parse_and_set_tristate( - update.properties.retention_duration, cfg.value, op); - continue; - } - // Unsupported property, return error return make_error_alter_config_resource_response( resource, diff --git a/src/v/model/model.cc b/src/v/model/model.cc index 4d5279d2edab7..c287f47bb19ea 100644 --- a/src/v/model/model.cc +++ b/src/v/model/model.cc @@ -123,11 +123,15 @@ operator<<(std::ostream& o, const model::topic_namespace_view& tp_ns) { } std::ostream& operator<<(std::ostream& os, timestamp_type ts) { + /** + * We need to use specific string representations of timestamp_type as this + * is related with protocol correctness + */ switch (ts) { case timestamp_type::append_time: - return os << "append_time"; + return os << "LogAppendTime"; case timestamp_type::create_time: - return os << "create_time"; + return os << "CreateTime"; } return os << "{unknown timestamp:" << static_cast(ts) << "}"; } diff --git a/src/v/model/tests/CMakeLists.txt b/src/v/model/tests/CMakeLists.txt index 4447effc13646..64d87dc2073d5 100644 --- a/src/v/model/tests/CMakeLists.txt +++ b/src/v/model/tests/CMakeLists.txt @@ -2,7 +2,7 @@ rp_test( UNIT_TEST BINARY_NAME model SOURCES - compression_lexical_cast_test.cc + lexical_cast_tests.cc ntp_path_test.cc topic_view_tests.cc DEFINITIONS BOOST_TEST_DYN_LINK diff --git a/src/v/model/tests/compression_lexical_cast_test.cc b/src/v/model/tests/lexical_cast_tests.cc similarity index 82% rename from src/v/model/tests/compression_lexical_cast_test.cc rename to src/v/model/tests/lexical_cast_tests.cc index b866b5077629e..af5057487f722 100644 --- a/src/v/model/tests/compression_lexical_cast_test.cc +++ b/src/v/model/tests/lexical_cast_tests.cc @@ -69,3 +69,19 @@ BOOST_AUTO_TEST_CASE(removing_compression) { attr.remove_compression(); BOOST_CHECK_EQUAL(attr.compression(), model::compression::none); }; + +BOOST_AUTO_TEST_CASE(timestamp_type_cast_from_string) { + BOOST_CHECK_EQUAL( + boost::lexical_cast("CreateTime"), + model::timestamp_type::create_time); + BOOST_CHECK_EQUAL( + boost::lexical_cast("LogAppendTime"), + model::timestamp_type::append_time); +}; + +BOOST_AUTO_TEST_CASE(timestamp_type_printing) { + BOOST_CHECK_EQUAL( + "CreateTime", fmt::format("{}", model::timestamp_type::create_time)); + BOOST_CHECK_EQUAL( + "LogAppendTime", fmt::format("{}", model::timestamp_type::append_time)); +}; diff --git a/src/v/raft/state_machine.cc b/src/v/raft/state_machine.cc index 7d9d8a39d20e0..aa7cba544e4b1 100644 --- a/src/v/raft/state_machine.cc +++ b/src/v/raft/state_machine.cc @@ -129,4 +129,12 @@ ss::future<> state_machine::write_last_applied(model::offset o) { return _raft->write_last_applied(o); } +ss::future> state_machine::instert_linerizable_barrier( + model::timeout_clock::time_point timeout) { + return ss::with_timeout(timeout, _raft->linearizable_barrier()) + .handle_exception_type([](const ss::timed_out_error&) { + return result(errc::timeout); + }); +} + } // namespace raft diff --git a/src/v/raft/state_machine.h b/src/v/raft/state_machine.h index 2c7ac912fb3f2..d9a52b69d15de 100644 --- a/src/v/raft/state_machine.h +++ b/src/v/raft/state_machine.h @@ -88,6 +88,16 @@ class state_machine { ss::future> quorum_write_empty_batch(model::timeout_clock::time_point); + /** + * Sends a round of heartbeats to followers, when majority of followers + * replied with success to either this of any following request all reads up + * to returned offsets are linearizable. (i.e. majority of followers have + * updated their commit indices to at least reaturned offset). For more + * details see paragraph 6.4 of Raft protocol dissertation. + */ + ss::future> + instert_linerizable_barrier(model::timeout_clock::time_point); + virtual ~state_machine() {} protected: diff --git a/tests/rptest/clients/kafka_cli_tools.py b/tests/rptest/clients/kafka_cli_tools.py index e10e235023cc2..cf526734dd7ab 100644 --- a/tests/rptest/clients/kafka_cli_tools.py +++ b/tests/rptest/clients/kafka_cli_tools.py @@ -90,14 +90,15 @@ def describe_topic(self, topic): configs = part[8:] def maybe_int(key, value): - if key in ["partition_count", "replication_factor"]: + if key in [ + "partition_count", "replication_factor", "retention_ms", + "retention_bytes", 'segment_bytes' + ]: value = int(value) return value def fix_key(key): - if key == "cleanup.policy": - return "cleanup_policy" - return key + return key.replace(".", "_") self._redpanda.logger.debug(f"Describe topics configs: {configs}") configs = [config.split("=") for config in configs.split(",")] @@ -112,6 +113,17 @@ def describe_broker_config(self): self._redpanda.logger.debug("Describe brokers config result: %s", res) return res + def alter_topic_config(self, topic, configuration_map): + self._redpanda.logger.debug("Altering topic %s configuration with %s", + topic, configuration_map) + args = ["--topic", topic, "--alter"] + args.append("--add-config") + args.append(",".join( + map(lambda item: f"{item[0]}={item[1]}", + configuration_map.items()))) + + return self._run("kafka-configs.sh", args) + def produce(self, topic, num_records, diff --git a/tests/rptest/clients/types.py b/tests/rptest/clients/types.py index 590d401b25026..b48a7ba5d6932 100644 --- a/tests/rptest/clients/types.py +++ b/tests/rptest/clients/types.py @@ -20,16 +20,48 @@ class TopicSpec: CLEANUP_COMPACT = "compact" CLEANUP_DELETE = "delete" - def __init__(self, - *, - name=None, - partition_count=1, - replication_factor=3, - cleanup_policy=CLEANUP_DELETE): + PROPERTY_COMPRESSSION = "compression.type" + PROPERTY_CLEANUP_POLICY = "cleanup.policy" + PROPERTY_COMPACTION_STRATEGY = "compaction.strategy" + PROPERTY_TIMESTAMP_TYPE = "message.timestamp.type" + PROPERTY_SEGMENT_SIZE = "segment.bytes" + PROPERTY_RETENTION_BYTES = "retention.bytes" + PROPERTY_RETENTION_TIME = "retention.ms" + + # compression types + COMPRESSION_NONE = "none" + COMPRESSION_PRODUCER = "producer" + COMPRESSION_GZIP = "gzip" + COMPRESSION_LZ4 = "lz4" + COMPRESSION_SNAPPY = "snappy" + COMPRESSION_ZSTD = "zstd" + + # timestamp types + TIMESTAMP_CREATE_TIME = "CreateTime" + TIMESTAMP_LOG_APPEND_TIME = "LogAppendTime" + + def __init__( + self, + *, + name=None, + partition_count=1, + replication_factor=3, + cleanup_policy=CLEANUP_DELETE, + compression_type=COMPRESSION_PRODUCER, + message_timestamp_type=TIMESTAMP_CREATE_TIME, + segment_bytes=1 * (2 ^ 30), + retention_bytes=-1, + retention_ms=(7 * 24 * 3600 * 1000) # one week + ): self.name = name or f"topic-{self._random_topic_suffix()}" self.partition_count = partition_count self.replication_factor = replication_factor self.cleanup_policy = cleanup_policy + self.compression_type = compression_type + self.message_timestamp_type = message_timestamp_type + self.segment_bytes = segment_bytes + self.retention_bytes = retention_bytes + self.retention_ms = retention_ms def __str__(self): return self.name diff --git a/tests/rptest/tests/alter_topic_configuration_test.py b/tests/rptest/tests/alter_topic_configuration_test.py new file mode 100644 index 0000000000000..a3385d467da18 --- /dev/null +++ b/tests/rptest/tests/alter_topic_configuration_test.py @@ -0,0 +1,89 @@ +# Copyright 2020 Vectorized, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +import random +import string +import time + +from ducktape.mark.resource import cluster +from ducktape.mark import parametrize +from ducktape.tests.test import test_logger +from rptest.clients.kafka_cli_tools import KafkaCliTools + +from rptest.clients.types import TopicSpec +from rptest.tests.redpanda_test import RedpandaTest + + +class AlterTopicConfiguration(RedpandaTest): + """ + Change a partition's replica set. + """ + topics = (TopicSpec(partition_count=1, replication_factor=3), ) + log_level = 'trace' + + def __init__(self, test_context): + super(AlterTopicConfiguration, + self).__init__(test_context=test_context, + num_brokers=3, + topics=self.topics, + log_level='trace') + + self.kafka_tools = KafkaCliTools(self.redpanda) + + @cluster(num_nodes=3) + @parametrize(property=TopicSpec.PROPERTY_CLEANUP_POLICY, value="compact") + @parametrize(property=TopicSpec.PROPERTY_SEGMENT_SIZE, + value=10 * (2 << 20)) + @parametrize(property=TopicSpec.PROPERTY_RETENTION_BYTES, + value=200 * (2 << 20)) + @parametrize(property=TopicSpec.PROPERTY_RETENTION_TIME, value=360000) + @parametrize(property=TopicSpec.PROPERTY_TIMESTAMP_TYPE, + value="LogAppendTime") + def test_altering_topic_configuration(self, property, value): + topic = self.topics[0].name + kafka_tools = KafkaCliTools(self.redpanda) + res = kafka_tools.alter_topic_config(topic, {property: value}) + spec = kafka_tools.describe_topic(topic) + + @cluster(num_nodes=3) + def test_altering_multiple_topic_configurations(self): + topic = self.topics[0].name + kafka_tools = KafkaCliTools(self.redpanda) + res = kafka_tools.alter_topic_config( + topic, { + TopicSpec.PROPERTY_SEGMENT_SIZE: 1024, + TopicSpec.PROPERTY_RETENTION_TIME: 360000, + TopicSpec.PROPERTY_TIMESTAMP_TYPE: "LogAppendTime" + }) + spec = kafka_tools.describe_topic(topic) + + assert spec.segment_bytes == 1024 + assert spec.retention_ms == 360000 + assert spec.message_timestamp_type == "LogAppendTime" + + def random_string(self, size): + return ''.join( + random.choice(string.ascii_uppercase + string.digits) + for _ in range(size)) + + @cluster(num_nodes=3) + def test_configuration_properties_name_validation(self): + topic = self.topics[0].name + kafka_tools = KafkaCliTools(self.redpanda) + spec = kafka_tools.describe_topic(topic) + for i in range(0, 5): + key = self.random_string(5) + try: + res = kafka_tools.alter_topic_config(topic, {key: "123"}) + except Exception as inst: + test_logger.info("exception %s", inst) + + new_spec = kafka_tools.describe_topic(topic) + # topic spec shouldn't change + assert new_spec == spec