Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented INCREMENTAL_ALTER_CONFIGS api #939

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 23 additions & 127 deletions src/v/kafka/server/handlers/alter_configs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

#include "kafka/server/handlers/alter_configs.h"

#include "cluster/topics_frontend.h"
#include "cluster/types.h"
#include "config/configuration.h"
#include "kafka/protocol/errors.h"
#include "kafka/protocol/schemata/alter_configs_request.h"
#include "kafka/protocol/schemata/alter_configs_response.h"
#include "kafka/server/handlers/configs/config_utils.h"
#include "kafka/server/handlers/topics/types.h"
#include "kafka/server/request_context.h"
#include "kafka/server/response.h"
Expand All @@ -32,55 +32,6 @@
#include <string_view>

namespace kafka {
/**
* Groupped alter_configs_resources
*
* NOTE:
* We do not have to differentiate between broker and broker_logger as
* broker_logger is deprecated it will be enough to generate error response.
*/
struct groupped_resources {
std::vector<alter_configs_resource> topic_changes;
std::vector<alter_configs_resource> broker_changes;
};

groupped_resources
group_alter_config_resources(std::vector<alter_configs_resource> req) {
groupped_resources ret;
for (auto& res : req) {
switch (config_resource_type(res.resource_type)) {
case config_resource_type::topic:
ret.topic_changes.push_back(std::move(res));
break;
default:
ret.broker_changes.push_back(std::move(res));
};
}
return ret;
}

alter_configs_response assemble_alter_config_response(
std::vector<std::vector<alter_configs_resource_response>> responses) {
alter_configs_response response;
for (auto& v : responses) {
std::move(
v.begin(), v.end(), std::back_inserter(response.data.responses));
}

return response;
}

alter_configs_resource_response make_error_alter_config_resource_response(
alter_configs_resource& resource,
kafka::error_code err,
std::optional<ss::sstring> msg = {}) {
return alter_configs_resource_response{
.error_code = err,
.error_message = std::move(msg),
.resource_type = resource.resource_type,
.resource_name = resource.resource_name};
}

template<typename T>
void parse_and_set_optional(
cluster::property_update<std::optional<T>>& property_update,
Expand Down Expand Up @@ -173,15 +124,17 @@ create_topic_properties_update(alter_configs_resource& resource) {
continue;
}
} catch (const boost::bad_lexical_cast& e) {
return make_error_alter_config_resource_response(
return make_error_alter_config_resource_response<
alter_configs_resource_response>(
resource,
error_code::invalid_config,
fmt::format(
"unable to parse property {} value {}", cfg.name, cfg.value));
}

// Unsupported property, return error
return make_error_alter_config_resource_response(
return make_error_alter_config_resource_response<
alter_configs_resource_response>(
resource,
error_code::invalid_config,
fmt::format("invalid topic property: {}", cfg.name));
Expand All @@ -190,85 +143,24 @@ create_topic_properties_update(alter_configs_resource& resource) {
return update;
}

ss::future<std::vector<alter_configs_resource_response>>
alter_topics_configuration(
static ss::future<std::vector<alter_configs_resource_response>>
alter_topic_configuration(
request_context& ctx,
std::vector<alter_configs_resource> resources,
bool validate_only) {
std::vector<alter_configs_resource_response> responses;
responses.reserve(resources.size());

absl::node_hash_set<ss::sstring> topic_names;
auto valid_end = std::stable_partition(
resources.begin(),
resources.end(),
[&topic_names](alter_configs_resource& r) {
return !topic_names.contains(r.resource_name);
return do_alter_topics_configuration<
alter_configs_resource,
alter_configs_resource_response>(
ctx, std::move(resources), validate_only, [](alter_configs_resource& r) {
return create_topic_properties_update(r);
});

for (auto& r : boost::make_iterator_range(valid_end, resources.end())) {
responses.push_back(make_error_alter_config_resource_response(
r,
error_code::invalid_config,
"duplicated topic {} alter config request"));
}
std::vector<cluster::topic_properties_update> updates;
for (auto& r : boost::make_iterator_range(resources.begin(), valid_end)) {
auto res = create_topic_properties_update(r);
if (res.has_error()) {
responses.push_back(std::move(res.error()));
} else {
updates.push_back(std::move(res.value()));
}
}

if (validate_only) {
// all pending updates are valid, just generate responses
for (auto& u : updates) {
responses.push_back(alter_configs_resource_response{
.error_code = error_code::none,
.resource_type = static_cast<int8_t>(config_resource_type::topic),
.resource_name = u.tp_ns.tp,
});
}

co_return responses;
}

auto update_results
= co_await ctx.topics_frontend().update_topic_properties(
std::move(updates),
model::timeout_clock::now()
+ config::shard_local_cfg().alter_topic_cfg_timeout_ms());
for (auto& res : update_results) {
responses.push_back(alter_configs_resource_response{
.error_code = error_code::none,
.resource_type = static_cast<int8_t>(config_resource_type::topic),
.resource_name = res.tp_ns.tp(),
});
}
co_return responses;
}

ss::future<std::vector<alter_configs_resource_response>>
static ss::future<std::vector<alter_configs_resource_response>>
alter_broker_configuartion(std::vector<alter_configs_resource> resources) {
// for now we do not support altering any of brokers config, generate errors
std::vector<alter_configs_resource_response> responses;
responses.reserve(resources.size());
std::transform(
resources.begin(),
resources.end(),
std::back_inserter(responses),
[](alter_configs_resource& resource) {
return make_error_alter_config_resource_response(
resource,
error_code::invalid_config,
fmt::format(
"changing '{}' broker property isn't currently supported",
resource.resource_name));
});

co_return responses;
return do_alter_broker_configuartion<
alter_configs_resource,
alter_configs_resource_response>(std::move(resources));
}

template<>
Expand All @@ -283,13 +175,17 @@ ss::future<response_ptr> alter_configs_handler::handle(
std::vector<ss::future<std::vector<alter_configs_resource_response>>>
futures;
futures.reserve(2);
futures.push_back(alter_topics_configuration(
futures.push_back(alter_topic_configuration(
ctx, std::move(groupped.topic_changes), request.data.validate_only));
futures.push_back(
alter_broker_configuartion(std::move(groupped.broker_changes)));

auto ret = co_await ss::when_all_succeed(futures.begin(), futures.end());

co_return co_await ctx.respond(
assemble_alter_config_response(std::move(ret)));
assemble_alter_config_response<
alter_configs_response,
alter_configs_resource_response>(std::move(ret)));
}

} // namespace kafka
152 changes: 152 additions & 0 deletions src/v/kafka/server/handlers/configs/config_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@

/*
* Copyright 2021 Vectorized, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#pragma once

#include "cluster/topics_frontend.h"
#include "cluster/types.h"
#include "kafka/protocol/errors.h"
#include "kafka/protocol/fwd.h"
#include "kafka/server/handlers/topics/types.h"
#include "kafka/server/request_context.h"
#include "kafka/types.h"
#include "outcome.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/sstring.hh>

#include <absl/container/node_hash_set.h>

#include <optional>

namespace kafka {
template<typename T>
struct groupped_resources {
std::vector<T> topic_changes;
std::vector<T> broker_changes;
};

template<typename T>
groupped_resources<T> group_alter_config_resources(std::vector<T> req) {
groupped_resources<T> ret;
for (auto& res : req) {
switch (config_resource_type(res.resource_type)) {
case config_resource_type::topic:
ret.topic_changes.push_back(std::move(res));
break;
default:
ret.broker_changes.push_back(std::move(res));
};
}
return ret;
}

template<typename T, typename R>
T assemble_alter_config_response(std::vector<std::vector<R>> responses) {
T response;
for (auto& v : responses) {
std::move(
v.begin(), v.end(), std::back_inserter(response.data.responses));
}

return response;
}
template<typename T, typename R>
T make_error_alter_config_resource_response(
const R& resource, error_code err, std::optional<ss::sstring> msg = {}) {
return T{
.error_code = err,
.error_message = std::move(msg),
.resource_type = resource.resource_type,
.resource_name = resource.resource_name};
}

template<typename T, typename R, typename Func>
ss::future<std::vector<R>> do_alter_topics_configuration(
request_context& ctx, std::vector<T> resources, bool validate_only, Func f) {
std::vector<R> responses;
responses.reserve(resources.size());

absl::node_hash_set<ss::sstring> topic_names;
auto valid_end = std::stable_partition(
resources.begin(), resources.end(), [&topic_names](T& r) {
return !topic_names.contains(r.resource_name);
});

for (auto& r : boost::make_iterator_range(valid_end, resources.end())) {
responses.push_back(make_error_alter_config_resource_response<R>(
r,
error_code::invalid_config,
"duplicated topic {} alter config request"));
}
std::vector<cluster::topic_properties_update> updates;
for (auto& r : boost::make_iterator_range(resources.begin(), valid_end)) {
auto res = f(r);
if (res.has_error()) {
responses.push_back(std::move(res.error()));
} else {
updates.push_back(std::move(res.value()));
}
}

if (validate_only) {
// all pending updates are valid, just generate responses
for (auto& u : updates) {
responses.push_back(R{
.error_code = error_code::none,
.resource_type = static_cast<int8_t>(config_resource_type::topic),
.resource_name = u.tp_ns.tp,
});
}

co_return responses;
}

auto update_results
= co_await ctx.topics_frontend().update_topic_properties(
std::move(updates),
model::timeout_clock::now()
+ config::shard_local_cfg().alter_topic_cfg_timeout_ms());
for (auto& res : update_results) {
responses.push_back(R{
.error_code = error_code::none,
.resource_type = static_cast<int8_t>(config_resource_type::topic),
.resource_name = res.tp_ns.tp(),
});
}
co_return responses;
}

template<typename T, typename R>
ss::future<std::vector<R>>
do_alter_broker_configuartion(std::vector<T> resources) {
// for now we do not support altering any of brokers config, generate
// errors
std::vector<R> responses;
responses.reserve(resources.size());
std::transform(
resources.begin(),
resources.end(),
std::back_inserter(responses),
[](T& resource) {
return make_error_alter_config_resource_response<R>(
resource,
error_code::invalid_config,
fmt::format(
"changing '{}' broker property isn't currently supported",
resource.resource_name));
});

return ss::make_ready_future<std::vector<R>>(std::move(responses));
}

} // namespace kafka
Loading