Skip to content

Commit

Permalink
Merge pull request #1187 from dotnwat/metadata-codegen
Browse files Browse the repository at this point in the history
kafka: use code generation for metadata api request
  • Loading branch information
dotnwat authored Apr 17, 2021
2 parents 3da918d + 5162a17 commit 98f1b9b
Show file tree
Hide file tree
Showing 19 changed files with 295 additions and 374 deletions.
13 changes: 8 additions & 5 deletions src/v/coproc/tests/read_materialized_topic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,18 @@ FIXTURE_TEST(test_metadata_request, router_test_fixture) {

/// Make a metadata request specifically for the materialized topic
kafka::metadata_request req{
.topics = {{output_topic}}, .list_all_topics = false};
.data = {.topics = {{{output_topic}}}},
.list_all_topics = false,
};
auto client = make_kafka_client().get0();
client.connect().get();
auto resp = client.dispatch(req, kafka::api_version(4)).get0();
client.stop().get();
BOOST_REQUIRE_EQUAL(resp.topics.size(), 1);
BOOST_REQUIRE_EQUAL(resp.topics[0].err_code, kafka::error_code::none);
BOOST_REQUIRE_EQUAL(resp.topics[0].name, output_topic);
BOOST_REQUIRE_EQUAL(resp.topics[0].partitions.size(), 1);
BOOST_REQUIRE_EQUAL(resp.data.topics.size(), 1);
BOOST_REQUIRE_EQUAL(
resp.data.topics[0].error_code, kafka::error_code::none);
BOOST_REQUIRE_EQUAL(resp.data.topics[0].name, output_topic);
BOOST_REQUIRE_EQUAL(resp.data.topics[0].partitions.size(), 1);
}

FIXTURE_TEST(test_read_from_materialized_topic, router_test_fixture) {
Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/client/assignment_plans.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ assignments assignment_range::plan(
rtm.reserve(std::distance(p_begin, p_end));
std::transform(
p_begin, p_end, std::back_inserter(rtm), [](auto& p) {
return p.index;
return p.partition_index;
});
p_begin = p_end;
++mem_it;
Expand Down
8 changes: 4 additions & 4 deletions src/v/kafka/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ ss::future<> client::update_metadata(wait_or_start::tag) {
.then([this](metadata_response res) {
// Create new seeds from the returned set of brokers
std::vector<unresolved_address> seeds;
seeds.reserve(res.brokers.size());
for (const auto& b : res.brokers) {
seeds.reserve(res.data.brokers.size());
for (const auto& b : res.data.brokers) {
seeds.emplace_back(b.host, b.port);
}
std::swap(_seeds, seeds);
Expand All @@ -125,8 +125,8 @@ ss::future<> client::update_metadata(wait_or_start::tag) {
}

ss::future<> client::apply(metadata_response res) {
co_await _brokers.apply(std::move(res.brokers));
co_await _topic_cache.apply(std::move(res.topics));
co_await _brokers.apply(std::move(res.data.brokers));
co_await _topic_cache.apply(std::move(res.data.topics));
}

ss::future<> client::mitigate_error(std::exception_ptr ex) {
Expand Down
11 changes: 7 additions & 4 deletions src/v/kafka/client/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ struct partition_comp {
bool operator()(
const metadata_response::partition& lhs,
const metadata_response::partition& rhs) const {
return lhs.index < rhs.index;
return lhs.partition_index < rhs.partition_index;
}
};

Expand Down Expand Up @@ -215,12 +215,15 @@ consumer::get_subscribed_topic_metadata() {
.then([this](metadata_response res) {
std::vector<sync_group_request_assignment> assignments;

std::sort(res.topics.begin(), res.topics.end(), detail::topic_comp{});
std::sort(
res.data.topics.begin(),
res.data.topics.end(),
detail::topic_comp{});
std::vector<metadata_response::topic> topics;
topics.reserve(_subscribed_topics.size());
std::set_intersection(
res.topics.begin(),
res.topics.end(),
res.data.topics.begin(),
res.data.topics.end(),
_subscribed_topics.begin(),
_subscribed_topics.end(),
std::back_inserter(topics),
Expand Down
4 changes: 2 additions & 2 deletions src/v/kafka/client/test/produce.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ FIXTURE_TEST(produce_reconnect, kafka_client_fixture) {

info("Client.dispatch metadata");
auto res = client.dispatch(make_list_topics_req()).get();
BOOST_REQUIRE_EQUAL(res.topics.size(), 1);
BOOST_REQUIRE_EQUAL(res.topics[0].name(), "t");
BOOST_REQUIRE_EQUAL(res.data.topics.size(), 1);
BOOST_REQUIRE_EQUAL(res.data.topics[0].name(), "t");

client.config().produce_batch_record_count.set_value(3);
client.config().produce_batch_size_bytes.set_value(1024);
Expand Down
10 changes: 5 additions & 5 deletions src/v/kafka/client/test/reconnect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ FIXTURE_TEST(reconnect, kafka_client_fixture) {
{
info("Checking no topics");
auto res = client.dispatch(make_list_topics_req()).get();
BOOST_REQUIRE_EQUAL(res.topics.size(), 0);
BOOST_REQUIRE_EQUAL(res.data.topics.size(), 0);
}

{
Expand All @@ -48,8 +48,8 @@ FIXTURE_TEST(reconnect, kafka_client_fixture) {
{
info("Checking for known topic");
auto res = client.dispatch(make_list_topics_req()).get();
BOOST_REQUIRE_EQUAL(res.topics.size(), 1);
BOOST_REQUIRE_EQUAL(res.topics[0].name(), "t");
BOOST_REQUIRE_EQUAL(res.data.topics.size(), 1);
BOOST_REQUIRE_EQUAL(res.data.topics[0].name(), "t");
}

{
Expand All @@ -67,8 +67,8 @@ FIXTURE_TEST(reconnect, kafka_client_fixture) {
client.config().retries.set_value(size_t(5));
info("Checking for known topic - controller ready");
auto res = client.dispatch(make_list_topics_req()).get();
BOOST_REQUIRE_EQUAL(res.topics.size(), 1);
BOOST_REQUIRE_EQUAL(res.topics[0].name(), "t");
BOOST_REQUIRE_EQUAL(res.data.topics.size(), 1);
BOOST_REQUIRE_EQUAL(res.data.topics[0].name(), "t");
}

info("Stopping client");
Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/client/topic_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ topic_cache::apply(std::vector<metadata_response::topic>&& topics) {
cache_t.partitions.reserve(t.partitions.size());
for (auto const& p : t.partitions) {
cache_t.partitions.emplace(
p.index, partition_data{.leader = p.leader});
p.partition_index, partition_data{.leader = p.leader_id});
}
cache_t.partitions.rehash(0);
}
Expand Down
85 changes: 39 additions & 46 deletions src/v/kafka/protocol/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

#pragma once

#include "kafka/protocol/schemata/metadata_request.h"
#include "kafka/protocol/schemata/metadata_response.h"
#include "kafka/server/request_context.h"
#include "kafka/server/response.h"
#include "model/metadata.h"
Expand All @@ -34,62 +36,53 @@ struct metadata_api final {
struct metadata_request {
using api_type = metadata_api;

std::optional<std::vector<model::topic>> topics;
bool allow_auto_topic_creation = true; // version >= 4
bool include_cluster_authorized_operations = false; // version >= 8
bool include_topic_authorized_operations = false; // version >= 8
metadata_request_data data;

bool list_all_topics{false};

void encode(response_writer& writer, api_version version);
void decode(request_context& ctx);
void encode(response_writer& writer, api_version version) {
data.encode(writer, version);
}

void decode(request_reader& reader, api_version version) {
data.decode(reader, version);
if (version > api_version(0)) {
list_all_topics = !data.topics;
} else {
if (unlikely(!data.topics)) {
// Version 0 of protocol doesn't use nullable topics set
throw std::runtime_error(
"Null topics received for version 0 of metadata request");
}
// For metadata API version 0, empty array requests all topics
list_all_topics = data.topics->empty();
}
}
};

std::ostream& operator<<(std::ostream&, const metadata_request&);
inline std::ostream& operator<<(std::ostream& os, const metadata_request& r) {
return os << r.data;
}

struct metadata_response {
using api_type = metadata_api;
using topic = metadata_response_topic;
using partition = metadata_response_partition;
using broker = metadata_response_broker;

struct broker {
model::node_id node_id;
ss::sstring host;
int32_t port;
std::optional<ss::sstring> rack; // version >= 1
};

struct partition {
error_code err_code;
model::partition_id index;
model::node_id leader;
int32_t leader_epoch; // version >= 7
std::vector<model::node_id> replica_nodes;
std::vector<model::node_id> isr_nodes;
std::vector<model::node_id> offline_replicas; // version >= 5
void encode(api_version version, response_writer& rw) const;
};

struct topic {
error_code err_code;
model::topic name;
bool is_internal{false}; // version >= 1
std::vector<partition> partitions;
int32_t topic_authorized_operations; // version >= 8
void encode(api_version version, response_writer& rw) const;
static topic make_from_topic_metadata(model::topic_metadata&& tp_md);
};

std::chrono::milliseconds throttle_time = std::chrono::milliseconds(
0); // version >= 3
std::vector<broker> brokers;
std::optional<ss::sstring> cluster_id; // version >= 2
model::node_id controller_id; // version >= 1
std::vector<topic> topics;
int32_t cluster_authorized_operations = 0; // version >= 8

void encode(const request_context& ctx, response& resp);
void decode(iobuf buf, api_version version);
metadata_response_data data;

void encode(const request_context& ctx, response& resp) {
data.encode(resp.writer(), ctx.header().version);
}

void decode(iobuf buf, api_version version) {
data.decode(std::move(buf), version);
}
};

std::ostream& operator<<(std::ostream&, const metadata_response&);
inline std::ostream& operator<<(std::ostream& os, const metadata_response& r) {
return os << r.data;
}

} // namespace kafka
4 changes: 3 additions & 1 deletion src/v/kafka/protocol/schemata/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ set(schemata
delete_acls_request.json
delete_acls_response.json
produce_request.json
produce_response.json)
produce_response.json
metadata_request.json
metadata_response.json)

set(srcs)
foreach(schema ${schemata})
Expand Down
12 changes: 12 additions & 0 deletions src/v/kafka/protocol/schemata/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,14 @@
},
},
},
"MetadataResponseData": {
"Topics": {
"Partitions": {
"PartitionIndex": ("model::partition_id", "int32"),
"IsrNodes": ("model::node_id", "int32"),
},
},
},
}

# a few kafka field types specify an entity type
Expand Down Expand Up @@ -299,6 +307,10 @@
"BatchIndexAndErrorMessage",
"TopicProduceData",
"PartitionProduceData",
"MetadataResponseBroker",
"MetadataRequestTopic",
"MetadataResponseTopic",
"MetadataResponsePartition",
]

SCALAR_TYPES = list(basic_type_map.keys())
Expand Down
46 changes: 46 additions & 0 deletions src/v/kafka/protocol/schemata/metadata_request.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 3,
"type": "request",
"name": "MetadataRequest",
"validVersions": "0-9",
"flexibleVersions": "9+",
"fields": [
// In version 0, an empty array indicates "request metadata for all topics." In version 1 and
// higher, an empty array indicates "request metadata for no topics," and a null array is used to
// indiate "request metadata for all topics."
//
// Version 2 and 3 are the same as version 1.
//
// Version 4 adds AllowAutoTopicCreation.
//
// Starting in version 8, authorized operations can be requested for cluster and topic resource.
//
// Version 9 is the first flexible version.
{ "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", "nullableVersions": "1+",
"about": "The topics to fetch metadata for.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." }
]},
{ "name": "AllowAutoTopicCreation", "type": "bool", "versions": "4+", "default": "true", "ignorable": false,
"about": "If this is true, the broker may auto-create topics that we requested which do not already exist, if it is configured to do so." },
{ "name": "IncludeClusterAuthorizedOperations", "type": "bool", "versions": "8+",
"about": "Whether to include cluster authorized operations." },
{ "name": "IncludeTopicAuthorizedOperations", "type": "bool", "versions": "8+",
"about": "Whether to include topic authorized operations." }
]
}
Loading

0 comments on commit 98f1b9b

Please sign in to comment.