Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes and tests for altering topic properties #947

Merged
merged 11 commits into from
Mar 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/v/cluster/controller.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
"name": "finish_partition_update",
"input_type": "finish_partition_update_request",
"output_type": "finish_partition_update_reply"
},
{
"name": "update_topic_properties",
"input_type": "update_topic_properties_request",
"output_type": "update_topic_properties_reply"
}
]
}
19 changes: 19 additions & 0 deletions src/v/cluster/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,23 @@ service::do_finish_partition_update(finish_partition_update_request&& req) {
co_return finish_partition_update_reply{.result = e};
}

ss::future<update_topic_properties_reply> service::update_topic_properties(
update_topic_properties_request&& req, rpc::streaming_context&) {
return ss::with_scheduling_group(
get_scheduling_group(), [this, req = std::move(req)]() mutable {
return do_update_topic_properties(std::move(req));
});
}

ss::future<update_topic_properties_reply>
service::do_update_topic_properties(update_topic_properties_request&& req) {
// local topic frontend instance will eventually dispatch request to _raft0
// core
auto res = co_await _topics_frontend.local().update_topic_properties(
mmaslankaprv marked this conversation as resolved.
Show resolved Hide resolved
req.updates,
config::shard_local_cfg().replicate_append_timeout_ms()
+ model::timeout_clock::now());

co_return update_topic_properties_reply{.results = std::move(res)};
}
} // namespace cluster
6 changes: 6 additions & 0 deletions src/v/cluster/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class service : public controller_service {
ss::future<finish_partition_update_reply> finish_partition_update(
finish_partition_update_request&&, rpc::streaming_context&) final;

ss::future<update_topic_properties_reply> update_topic_properties(
update_topic_properties_request&&, rpc::streaming_context&) final;

private:
std::
pair<std::vector<model::topic_metadata>, std::vector<topic_configuration>>
Expand All @@ -51,6 +54,9 @@ class service : public controller_service {
ss::future<finish_partition_update_reply>
do_finish_partition_update(finish_partition_update_request&&);

ss::future<update_topic_properties_reply>
do_update_topic_properties(update_topic_properties_request&&);

ss::sharded<topics_frontend>& _topics_frontend;
ss::sharded<members_manager>& _members_manager;
ss::sharded<metadata_cache>& _md_cache;
Expand Down
112 changes: 94 additions & 18 deletions src/v/cluster/topics_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ topics_frontend::topics_frontend(
, _leaders(l)
, _as(as) {}

static bool
needs_linearizable_barrier(const std::vector<topic_result>& results) {
return std::any_of(
results.cbegin(), results.cend(), [](const topic_result& r) {
return r.ec == errc::success;
});
}

ss::future<std::vector<topic_result>> topics_frontend::create_topics(
std::vector<topic_configuration> topics,
model::timeout_clock::time_point timeout) {
Expand Down Expand Up @@ -77,23 +85,16 @@ ss::future<std::vector<topic_result>> topics_frontend::create_topics(
});

return ss::when_all_succeed(futures.begin(), futures.end());
});
}

ss::future<std::vector<topic_result>> topics_frontend::update_topic_properties(
std::vector<topic_properties_update> updates,
model::timeout_clock::time_point timeout) {
auto result = co_await _stm.invoke_on(
controller_stm_shard, [timeout](controller_stm& stm) {
return stm.quorum_write_empty_batch(timeout);
});

if (!result) {
co_return create_topic_results(updates, errc::not_leader_controller);
}
co_return co_await ssx::parallel_transform(
std::move(updates), [this, timeout](topic_properties_update update) {
return do_update_topic_properties(std::move(update), timeout);
})
.then([this, timeout](std::vector<topic_result> results) {
if (needs_linearizable_barrier(results)) {
return stm_linearizable_barrier(timeout).then(
[results = std::move(results)](result<model::offset>) mutable {
return results;
});
}
return ss::make_ready_future<std::vector<topic_result>>(
std::move(results));
});
}

Expand Down Expand Up @@ -130,6 +131,64 @@ cluster::errc map_errc(std::error_code ec) {
return errc::replication_error;
}

ss::future<std::vector<topic_result>> topics_frontend::update_topic_properties(
std::vector<topic_properties_update> updates,
model::timeout_clock::time_point timeout) {
auto cluster_leader = _leaders.local().get_leader(model::controller_ntp);

// no leader available
if (!cluster_leader) {
co_return create_topic_results(updates, errc::no_leader_controller);
}

// current node is a leader, just replicate
if (cluster_leader == _self) {
// replicate empty batch to make sure leader local state is up to date.
auto result = co_await _stm.invoke_on(
mmaslankaprv marked this conversation as resolved.
Show resolved Hide resolved
controller_stm_shard, [timeout](controller_stm& stm) {
return stm.quorum_write_empty_batch(timeout);
});
if (!result) {
co_return create_topic_results(updates, map_errc(result.error()));
}

auto results = co_await ssx::parallel_transform(
std::move(updates), [this, timeout](topic_properties_update update) {
return do_update_topic_properties(std::move(update), timeout);
});

// we are not really interested in the result comming from the
// linearizable barrier, results comming from the previous steps will be
// propagated to clients, this is just an optimization, this doesn't
// affect correctness of the protocol
if (needs_linearizable_barrier(results)) {
co_await stm_linearizable_barrier(timeout).discard_result();
mmaslankaprv marked this conversation as resolved.
Show resolved Hide resolved
}

co_return results;
}

co_return co_await _connections.local()
mmaslankaprv marked this conversation as resolved.
Show resolved Hide resolved
.with_node_client<controller_client_protocol>(
_self,
ss::this_shard_id(),
*cluster_leader,
timeout,
[updates, timeout](controller_client_protocol client) mutable {
return client
.update_topic_properties(
update_topic_properties_request{.updates = std::move(updates)},
rpc::client_opts(timeout))
.then(&rpc::get_ctx_data<update_topic_properties_reply>);
})
.then([updates](result<update_topic_properties_reply> r) {
if (r.has_error()) {
return create_topic_results(updates, map_errc(r.error()));
}
return std::move(r.value().results);
});
}

ss::future<topic_result> topics_frontend::do_update_topic_properties(
topic_properties_update update, model::timeout_clock::time_point timeout) {
update_topic_properties_cmd cmd(update.tp_ns, update.properties);
Expand Down Expand Up @@ -266,7 +325,17 @@ ss::future<std::vector<topic_result>> topics_frontend::delete_topics(
return do_delete_topic(std::move(tp_ns), timeout);
});

return ss::when_all_succeed(futures.begin(), futures.end());
return ss::when_all_succeed(futures.begin(), futures.end())
.then([this, timeout](std::vector<topic_result> results) {
if (needs_linearizable_barrier(results)) {
return stm_linearizable_barrier(timeout).then(
[results = std::move(results)](result<model::offset>) mutable {
return results;
});
}
return ss::make_ready_future<std::vector<topic_result>>(
std::move(results));
});
}

ss::future<topic_result> topics_frontend::do_delete_topic(
Expand Down Expand Up @@ -401,4 +470,11 @@ ss::future<std::error_code> topics_frontend::finish_moving_partition_replicas(
});
}

ss::future<result<model::offset>> topics_frontend::stm_linearizable_barrier(
model::timeout_clock::time_point timeout) {
return _stm.invoke_on(controller_stm_shard, [timeout](controller_stm& stm) {
return stm.instert_linerizable_barrier(timeout);
});
}

} // namespace cluster
4 changes: 4 additions & 0 deletions src/v/cluster/topics_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ class topics_frontend {
ss::future<topic_result> do_update_topic_properties(
topic_properties_update, model::timeout_clock::time_point);
ss::future<> update_leaders_with_estimates(std::vector<ntp_leader>);

ss::future<result<model::offset>>
stm_linearizable_barrier(model::timeout_clock::time_point);

// returns true if the topic name is valid
static bool validate_topic_name(const model::topic_namespace&);

Expand Down
14 changes: 14 additions & 0 deletions src/v/cluster/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -322,4 +322,18 @@ adl<cluster::configuration_invariants>::from(iobuf_parser& parser) {

return ret;
}

void adl<cluster::topic_properties_update>::to(
iobuf& out, cluster::topic_properties_update&& r) {
reflection::serialize(out, r.tp_ns, r.properties);
}

cluster::topic_properties_update
adl<cluster::topic_properties_update>::from(iobuf_parser& parser) {
auto tp_ns = adl<model::topic_namespace>{}.from(parser);
cluster::topic_properties_update ret(std::move(tp_ns));
ret.properties = adl<cluster::incremental_topic_updates>{}.from(parser);

return ret;
}
} // namespace reflection
14 changes: 14 additions & 0 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,14 @@ struct finish_partition_update_reply {
cluster::errc result;
};

struct update_topic_properties_request {
std::vector<topic_properties_update> updates;
};

struct update_topic_properties_reply {
std::vector<topic_result> results;
};

template<typename T>
struct patch {
std::vector<T> additions;
Expand Down Expand Up @@ -317,4 +325,10 @@ struct adl<cluster::configuration_invariants> {
cluster::configuration_invariants from(iobuf_parser&);
};

template<>
struct adl<cluster::topic_properties_update> {
void to(iobuf&, cluster::topic_properties_update&&);
cluster::topic_properties_update from(iobuf_parser&);
};

} // namespace reflection
2 changes: 1 addition & 1 deletion src/v/kafka/server/handlers/configs/config_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ ss::future<std::vector<R>> do_alter_topics_configuration(
+ config::shard_local_cfg().alter_topic_cfg_timeout_ms());
for (auto& res : update_results) {
responses.push_back(R{
.error_code = error_code::none,
.error_code = map_topic_error_code(res.ec),
mmaslankaprv marked this conversation as resolved.
Show resolved Hide resolved
.resource_type = static_cast<int8_t>(config_resource_type::topic),
.resource_name = res.tp_ns.tp(),
});
Expand Down
17 changes: 9 additions & 8 deletions src/v/kafka/server/handlers/describe_configs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "config/configuration.h"
#include "kafka/protocol/errors.h"
#include "kafka/server/handlers/topics/topic_utils.h"
#include "kafka/server/handlers/topics/types.h"
#include "kafka/server/request_context.h"
#include "kafka/server/response.h"
#include "model/fundamental.h"
Expand All @@ -35,11 +36,11 @@ namespace kafka {
template<typename T>
static void add_config(
describe_configs_result& result,
const char* name,
std::string_view name,
T value,
describe_configs_source source) {
result.configs.push_back(describe_configs_resource_result{
.name = name,
.name = ss::sstring(name),
.value = fmt::format("{}", value),
.config_source = source,
});
Expand Down Expand Up @@ -193,22 +194,22 @@ ss::future<response_ptr> describe_configs_handler::handle(

add_config(
result,
"cleanup.policy",
topic_property_cleanup_policy,
describe_topic_cleanup_policy(
topic_config,
ctx.metadata_cache().get_default_cleanup_policy_bitflags()),
describe_configs_source::topic);

add_config(
result,
"compression.type",
topic_property_compression,
topic_config->properties.compression.value_or(
ctx.metadata_cache().get_default_compression()),
describe_configs_source::topic);

add_config(
result,
"segment.bytes",
topic_property_segment_size,
topic_config->properties.segment_size.value_or(
topic_config->properties.is_compacted()
? ctx.metadata_cache()
Expand All @@ -218,23 +219,23 @@ ss::future<response_ptr> describe_configs_handler::handle(

add_config(
result,
"retention.ms",
topic_property_retention_duration,
describe_retention_duration(
topic_config->properties.retention_duration,
ctx.metadata_cache().get_default_retention_duration()),
describe_configs_source::topic);

add_config(
result,
"retention.bytes",
topic_property_retention_bytes,
describe_retention_bytes(
topic_config->properties.retention_bytes,
ctx.metadata_cache().get_default_retention_bytes()),
describe_configs_source::topic);

add_config(
result,
"timestamp.type",
topic_property_timestamp_type,
topic_config->properties.timestamp_type.value_or(
ctx.metadata_cache().get_default_timestamp_type()),
describe_configs_source::topic);
Expand Down
Loading