Skip to content

Commit

Permalink
Merge pull request #947 from mmaslankaprv/alter-config-tests
Browse files Browse the repository at this point in the history
Fixes and tests for altering topic properties
  • Loading branch information
mmaslankaprv authored Mar 24, 2021
2 parents 6cfb490 + a58870b commit 0e6ca86
Show file tree
Hide file tree
Showing 18 changed files with 397 additions and 76 deletions.
5 changes: 5 additions & 0 deletions src/v/cluster/controller.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
"name": "finish_partition_update",
"input_type": "finish_partition_update_request",
"output_type": "finish_partition_update_reply"
},
{
"name": "update_topic_properties",
"input_type": "update_topic_properties_request",
"output_type": "update_topic_properties_reply"
}
]
}
19 changes: 19 additions & 0 deletions src/v/cluster/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,23 @@ service::do_finish_partition_update(finish_partition_update_request&& req) {
co_return finish_partition_update_reply{.result = e};
}

ss::future<update_topic_properties_reply> service::update_topic_properties(
update_topic_properties_request&& req, rpc::streaming_context&) {
return ss::with_scheduling_group(
get_scheduling_group(), [this, req = std::move(req)]() mutable {
return do_update_topic_properties(std::move(req));
});
}

ss::future<update_topic_properties_reply>
service::do_update_topic_properties(update_topic_properties_request&& req) {
// local topic frontend instance will eventually dispatch request to _raft0
// core
auto res = co_await _topics_frontend.local().update_topic_properties(
req.updates,
config::shard_local_cfg().replicate_append_timeout_ms()
+ model::timeout_clock::now());

co_return update_topic_properties_reply{.results = std::move(res)};
}
} // namespace cluster
6 changes: 6 additions & 0 deletions src/v/cluster/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class service : public controller_service {
ss::future<finish_partition_update_reply> finish_partition_update(
finish_partition_update_request&&, rpc::streaming_context&) final;

ss::future<update_topic_properties_reply> update_topic_properties(
update_topic_properties_request&&, rpc::streaming_context&) final;

private:
std::
pair<std::vector<model::topic_metadata>, std::vector<topic_configuration>>
Expand All @@ -51,6 +54,9 @@ class service : public controller_service {
ss::future<finish_partition_update_reply>
do_finish_partition_update(finish_partition_update_request&&);

ss::future<update_topic_properties_reply>
do_update_topic_properties(update_topic_properties_request&&);

ss::sharded<topics_frontend>& _topics_frontend;
ss::sharded<members_manager>& _members_manager;
ss::sharded<metadata_cache>& _md_cache;
Expand Down
112 changes: 94 additions & 18 deletions src/v/cluster/topics_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ topics_frontend::topics_frontend(
, _leaders(l)
, _as(as) {}

static bool
needs_linearizable_barrier(const std::vector<topic_result>& results) {
return std::any_of(
results.cbegin(), results.cend(), [](const topic_result& r) {
return r.ec == errc::success;
});
}

ss::future<std::vector<topic_result>> topics_frontend::create_topics(
std::vector<topic_configuration> topics,
model::timeout_clock::time_point timeout) {
Expand Down Expand Up @@ -77,23 +85,16 @@ ss::future<std::vector<topic_result>> topics_frontend::create_topics(
});

return ss::when_all_succeed(futures.begin(), futures.end());
});
}

ss::future<std::vector<topic_result>> topics_frontend::update_topic_properties(
std::vector<topic_properties_update> updates,
model::timeout_clock::time_point timeout) {
auto result = co_await _stm.invoke_on(
controller_stm_shard, [timeout](controller_stm& stm) {
return stm.quorum_write_empty_batch(timeout);
});

if (!result) {
co_return create_topic_results(updates, errc::not_leader_controller);
}
co_return co_await ssx::parallel_transform(
std::move(updates), [this, timeout](topic_properties_update update) {
return do_update_topic_properties(std::move(update), timeout);
})
.then([this, timeout](std::vector<topic_result> results) {
if (needs_linearizable_barrier(results)) {
return stm_linearizable_barrier(timeout).then(
[results = std::move(results)](result<model::offset>) mutable {
return results;
});
}
return ss::make_ready_future<std::vector<topic_result>>(
std::move(results));
});
}

Expand Down Expand Up @@ -130,6 +131,64 @@ cluster::errc map_errc(std::error_code ec) {
return errc::replication_error;
}

ss::future<std::vector<topic_result>> topics_frontend::update_topic_properties(
std::vector<topic_properties_update> updates,
model::timeout_clock::time_point timeout) {
auto cluster_leader = _leaders.local().get_leader(model::controller_ntp);

// no leader available
if (!cluster_leader) {
co_return create_topic_results(updates, errc::no_leader_controller);
}

// current node is a leader, just replicate
if (cluster_leader == _self) {
// replicate empty batch to make sure leader local state is up to date.
auto result = co_await _stm.invoke_on(
controller_stm_shard, [timeout](controller_stm& stm) {
return stm.quorum_write_empty_batch(timeout);
});
if (!result) {
co_return create_topic_results(updates, map_errc(result.error()));
}

auto results = co_await ssx::parallel_transform(
std::move(updates), [this, timeout](topic_properties_update update) {
return do_update_topic_properties(std::move(update), timeout);
});

// we are not really interested in the result comming from the
// linearizable barrier, results comming from the previous steps will be
// propagated to clients, this is just an optimization, this doesn't
// affect correctness of the protocol
if (needs_linearizable_barrier(results)) {
co_await stm_linearizable_barrier(timeout).discard_result();
}

co_return results;
}

co_return co_await _connections.local()
.with_node_client<controller_client_protocol>(
_self,
ss::this_shard_id(),
*cluster_leader,
timeout,
[updates, timeout](controller_client_protocol client) mutable {
return client
.update_topic_properties(
update_topic_properties_request{.updates = std::move(updates)},
rpc::client_opts(timeout))
.then(&rpc::get_ctx_data<update_topic_properties_reply>);
})
.then([updates](result<update_topic_properties_reply> r) {
if (r.has_error()) {
return create_topic_results(updates, map_errc(r.error()));
}
return std::move(r.value().results);
});
}

ss::future<topic_result> topics_frontend::do_update_topic_properties(
topic_properties_update update, model::timeout_clock::time_point timeout) {
update_topic_properties_cmd cmd(update.tp_ns, update.properties);
Expand Down Expand Up @@ -266,7 +325,17 @@ ss::future<std::vector<topic_result>> topics_frontend::delete_topics(
return do_delete_topic(std::move(tp_ns), timeout);
});

return ss::when_all_succeed(futures.begin(), futures.end());
return ss::when_all_succeed(futures.begin(), futures.end())
.then([this, timeout](std::vector<topic_result> results) {
if (needs_linearizable_barrier(results)) {
return stm_linearizable_barrier(timeout).then(
[results = std::move(results)](result<model::offset>) mutable {
return results;
});
}
return ss::make_ready_future<std::vector<topic_result>>(
std::move(results));
});
}

ss::future<topic_result> topics_frontend::do_delete_topic(
Expand Down Expand Up @@ -401,4 +470,11 @@ ss::future<std::error_code> topics_frontend::finish_moving_partition_replicas(
});
}

ss::future<result<model::offset>> topics_frontend::stm_linearizable_barrier(
model::timeout_clock::time_point timeout) {
return _stm.invoke_on(controller_stm_shard, [timeout](controller_stm& stm) {
return stm.instert_linerizable_barrier(timeout);
});
}

} // namespace cluster
4 changes: 4 additions & 0 deletions src/v/cluster/topics_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ class topics_frontend {
ss::future<topic_result> do_update_topic_properties(
topic_properties_update, model::timeout_clock::time_point);
ss::future<> update_leaders_with_estimates(std::vector<ntp_leader>);

ss::future<result<model::offset>>
stm_linearizable_barrier(model::timeout_clock::time_point);

// returns true if the topic name is valid
static bool validate_topic_name(const model::topic_namespace&);

Expand Down
14 changes: 14 additions & 0 deletions src/v/cluster/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -322,4 +322,18 @@ adl<cluster::configuration_invariants>::from(iobuf_parser& parser) {

return ret;
}

void adl<cluster::topic_properties_update>::to(
iobuf& out, cluster::topic_properties_update&& r) {
reflection::serialize(out, r.tp_ns, r.properties);
}

cluster::topic_properties_update
adl<cluster::topic_properties_update>::from(iobuf_parser& parser) {
auto tp_ns = adl<model::topic_namespace>{}.from(parser);
cluster::topic_properties_update ret(std::move(tp_ns));
ret.properties = adl<cluster::incremental_topic_updates>{}.from(parser);

return ret;
}
} // namespace reflection
14 changes: 14 additions & 0 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,14 @@ struct finish_partition_update_reply {
cluster::errc result;
};

struct update_topic_properties_request {
std::vector<topic_properties_update> updates;
};

struct update_topic_properties_reply {
std::vector<topic_result> results;
};

template<typename T>
struct patch {
std::vector<T> additions;
Expand Down Expand Up @@ -317,4 +325,10 @@ struct adl<cluster::configuration_invariants> {
cluster::configuration_invariants from(iobuf_parser&);
};

template<>
struct adl<cluster::topic_properties_update> {
void to(iobuf&, cluster::topic_properties_update&&);
cluster::topic_properties_update from(iobuf_parser&);
};

} // namespace reflection
2 changes: 1 addition & 1 deletion src/v/kafka/server/handlers/configs/config_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ ss::future<std::vector<R>> do_alter_topics_configuration(
+ config::shard_local_cfg().alter_topic_cfg_timeout_ms());
for (auto& res : update_results) {
responses.push_back(R{
.error_code = error_code::none,
.error_code = map_topic_error_code(res.ec),
.resource_type = static_cast<int8_t>(config_resource_type::topic),
.resource_name = res.tp_ns.tp(),
});
Expand Down
17 changes: 9 additions & 8 deletions src/v/kafka/server/handlers/describe_configs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "config/configuration.h"
#include "kafka/protocol/errors.h"
#include "kafka/server/handlers/topics/topic_utils.h"
#include "kafka/server/handlers/topics/types.h"
#include "kafka/server/request_context.h"
#include "kafka/server/response.h"
#include "model/fundamental.h"
Expand All @@ -35,11 +36,11 @@ namespace kafka {
template<typename T>
static void add_config(
describe_configs_result& result,
const char* name,
std::string_view name,
T value,
describe_configs_source source) {
result.configs.push_back(describe_configs_resource_result{
.name = name,
.name = ss::sstring(name),
.value = fmt::format("{}", value),
.config_source = source,
});
Expand Down Expand Up @@ -193,22 +194,22 @@ ss::future<response_ptr> describe_configs_handler::handle(

add_config(
result,
"cleanup.policy",
topic_property_cleanup_policy,
describe_topic_cleanup_policy(
topic_config,
ctx.metadata_cache().get_default_cleanup_policy_bitflags()),
describe_configs_source::topic);

add_config(
result,
"compression.type",
topic_property_compression,
topic_config->properties.compression.value_or(
ctx.metadata_cache().get_default_compression()),
describe_configs_source::topic);

add_config(
result,
"segment.bytes",
topic_property_segment_size,
topic_config->properties.segment_size.value_or(
topic_config->properties.is_compacted()
? ctx.metadata_cache()
Expand All @@ -218,23 +219,23 @@ ss::future<response_ptr> describe_configs_handler::handle(

add_config(
result,
"retention.ms",
topic_property_retention_duration,
describe_retention_duration(
topic_config->properties.retention_duration,
ctx.metadata_cache().get_default_retention_duration()),
describe_configs_source::topic);

add_config(
result,
"retention.bytes",
topic_property_retention_bytes,
describe_retention_bytes(
topic_config->properties.retention_bytes,
ctx.metadata_cache().get_default_retention_bytes()),
describe_configs_source::topic);

add_config(
result,
"timestamp.type",
topic_property_timestamp_type,
topic_config->properties.timestamp_type.value_or(
ctx.metadata_cache().get_default_timestamp_type()),
describe_configs_source::topic);
Expand Down
Loading

0 comments on commit 0e6ca86

Please sign in to comment.