Skip to content

Commit

Permalink
kafka/test: Allow multiple consumers, producers, and partitions
Browse files Browse the repository at this point in the history
Refactor only, no functional change.

Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed Feb 28, 2024
1 parent bd42479 commit 20ca50a
Showing 1 changed file with 79 additions and 67 deletions.
146 changes: 79 additions & 67 deletions src/v/kafka/server/tests/produce_consume_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,39 @@ using std::vector;
using tests::kv_t;

struct prod_consume_fixture : public redpanda_thread_fixture {
void start() {
consumer = std::make_unique<kafka::client::transport>(
make_kafka_client().get0());
producer = std::make_unique<kafka::client::transport>(
make_kafka_client().get0());
consumer->connect().get0();
producer->connect().get0();
model::topic_namespace tp_ns(model::ns("kafka"), test_topic);
add_topic(tp_ns).get0();
model::ntp ntp(tp_ns.ns, tp_ns.tp, model::partition_id(0));
tests::cooperative_spin_wait_with_timeout(2s, [ntp, this] {
auto shard = app.shard_table.local().shard_for(ntp);
if (!shard) {
return ss::make_ready_future<bool>(false);
}
return app.partition_manager.invoke_on(
*shard, [ntp](cluster::partition_manager& pm) {
return pm.get(ntp)->is_leader();
});
}).get0();
void start(unsigned int count = 1) {
producers.reserve(count);
consumers.reserve(count);
fetch_offsets.resize(count, model::offset{0});

add_topic(test_tp_ns, static_cast<int>(count)).get();

ss::parallel_for_each(boost::irange(0u, count), [&](auto i) {
consumers.emplace_back(make_kafka_client().get());
auto& consumer = consumers.back();

producers.emplace_back(make_kafka_client().get());
auto& producer = producers.back();

model::ntp ntp(
test_tp_ns.ns, test_tp_ns.tp, model::partition_id(i));
return ss::when_all_succeed(
producer.connect(),
consumer.connect(),
tests::cooperative_spin_wait_with_timeout(
2s,
[ntp, this] {
auto shard = app.shard_table.local().shard_for(ntp);
if (!shard) {
return ss::make_ready_future<bool>(false);
}
return app.partition_manager.invoke_on(
*shard, [ntp](cluster::partition_manager& pm) {
return pm.get(ntp)->is_leader();
});
}))
.discard_result();
}).get();
}

std::vector<kafka::produce_request::partition> small_batches(size_t count) {
Expand All @@ -74,8 +87,9 @@ struct prod_consume_fixture : public redpanda_thread_fixture {
return res;
}

ss::future<kafka::produce_response>
produce_raw(std::vector<kafka::produce_request::partition>&& partitions) {
ss::future<kafka::produce_response> produce_raw(
kafka::client::transport& producer,
std::vector<kafka::produce_request::partition>&& partitions) {
kafka::produce_request::topic tp;
tp.partitions = std::move(partitions);
tp.name = test_topic;
Expand All @@ -85,7 +99,12 @@ struct prod_consume_fixture : public redpanda_thread_fixture {
req.data.timeout_ms = std::chrono::seconds(2);
req.has_idempotent = false;
req.has_transactional = false;
return producer->dispatch(std::move(req));
return producer.dispatch(std::move(req));
}

ss::future<kafka::produce_response>
produce_raw(std::vector<kafka::produce_request::partition>&& partitions) {
return produce_raw(producers.front(), std::move(partitions));
}

template<typename T>
Expand All @@ -98,10 +117,11 @@ struct prod_consume_fixture : public redpanda_thread_fixture {
});
}

ss::future<kafka::fetch_response> fetch_next() {
ss::future<kafka::fetch_response>
fetch_next(kafka::client::transport& consumer, model::partition_id p_id) {
kafka::fetch_request::partition partition;
partition.fetch_offset = fetch_offset;
partition.partition_index = model::partition_id(0);
partition.fetch_offset = fetch_offsets[p_id()];
partition.partition_index = p_id;
partition.log_start_offset = model::offset(0);
partition.max_bytes = 1_MiB;
kafka::fetch_request::topic topic;
Expand All @@ -114,8 +134,8 @@ struct prod_consume_fixture : public redpanda_thread_fixture {
req.data.max_wait_ms = 1000ms;
req.data.topics.push_back(std::move(topic));

return consumer->dispatch(std::move(req), kafka::api_version(4))
.then([this](kafka::fetch_response resp) {
return consumer.dispatch(std::move(req), kafka::api_version(4))
.then([this, p_id](kafka::fetch_response resp) {
if (resp.data.topics.empty()) {
return resp;
}
Expand All @@ -125,18 +145,24 @@ struct prod_consume_fixture : public redpanda_thread_fixture {
const auto& data = part.partitions.begin()->records;
if (data && !data->empty()) {
// update next fetch offset the same way as Kafka clients
fetch_offset = ++data->last_offset();
fetch_offsets[p_id()] = ++data->last_offset();
}
}
return resp;
});
}

model::offset fetch_offset{0};
std::unique_ptr<kafka::client::transport> consumer;
std::unique_ptr<kafka::client::transport> producer;
ss::future<kafka::fetch_response> fetch_next() {
return fetch_next(consumers.front(), model::partition_id{0});
}

std::vector<model::offset> fetch_offsets;
std::vector<kafka::client::transport> consumers;
std::vector<kafka::client::transport> producers;
ss::abort_source as;
const model::topic test_topic = model::topic("test-topic");
const model::topic_namespace test_tp_ns = {
model::ns("kafka"), model::topic("test-topic")};
const model::topic& test_topic = test_tp_ns.tp;
};

/**
Expand Down Expand Up @@ -184,8 +210,8 @@ FIXTURE_TEST(test_version_handler, prod_consume_fixture) {
const auto unsupported_version = kafka::api_version(
kafka::produce_handler::max_supported() + 1);
BOOST_CHECK_THROW(
producer
->dispatch(
producers.front()
.dispatch(
// NOLINTNEXTLINE(bugprone-use-after-move)
kafka::produce_request(std::nullopt, 1, std::move(topics)),
unsupported_version)
Expand All @@ -194,7 +220,7 @@ FIXTURE_TEST(test_version_handler, prod_consume_fixture) {
}

static std::vector<kafka::produce_request::partition>
single_batch(const size_t volume) {
single_batch(model::partition_id p_id, const size_t volume) {
storage::record_batch_builder builder(
model::record_batch_type::raft_data, model::offset(0));
{
Expand All @@ -205,7 +231,7 @@ single_batch(const size_t volume) {
}

kafka::produce_request::partition partition;
partition.partition_index = model::partition_id(0);
partition.partition_index = p_id;
partition.records.emplace(std::move(builder).build());

std::vector<kafka::produce_request::partition> res;
Expand All @@ -216,6 +242,9 @@ single_batch(const size_t volume) {
namespace ch = std::chrono;

struct throughput_limits_fixure : prod_consume_fixture {
static constexpr size_t kafka_packet_in_overhead = 127;
static constexpr size_t kafka_packet_eg_overhead = 62;

ch::milliseconds _window_width;
ch::milliseconds _balancer_period;
int64_t _rate_minimum;
Expand Down Expand Up @@ -273,18 +302,17 @@ struct throughput_limits_fixure : prod_consume_fixture {
const size_t batch_size,
const int tolerance_percent) {
size_t kafka_in_data_len = 0;
constexpr size_t kafka_packet_overhead = 127;
// do not divide rate by smp::count because
// - balanced case: TP will be balanced and the entire quota will end
// up in one shard
// - static case: rate_limit is per shard
const auto batches_cnt = /* 1s * */ rate_limit_in
/ (batch_size + kafka_packet_overhead);
/ (batch_size + kafka_packet_in_overhead);
ch::steady_clock::time_point start;
ch::milliseconds throttle_time{};

for (int k = -warmup_cycles(
rate_limit_in, batch_size + kafka_packet_overhead);
rate_limit_in, batch_size + kafka_packet_in_overhead);
k != batches_cnt;
++k) {
if (k == 0) {
Expand All @@ -294,15 +322,16 @@ struct throughput_limits_fixure : prod_consume_fixture {
false,
"Ingress measurement starts. batches: " << batches_cnt);
}
throttle_time += produce_raw(single_batch(batch_size))
throttle_time += produce_raw(
single_batch(model::partition_id{0}, batch_size))
.then([](const kafka::produce_response& r) {
return r.data.throttle_time_ms;
})
.get0();
kafka_in_data_len += batch_size;
}
const auto stop = ch::steady_clock::now();
const auto wire_data_length = (batch_size + kafka_packet_overhead)
const auto wire_data_length = (batch_size + kafka_packet_in_overhead)
* batches_cnt;
const auto rate_estimated = rate_limit_in
- _rate_minimum * (ss::smp::count - 1);
Expand All @@ -324,7 +353,6 @@ struct throughput_limits_fixure : prod_consume_fixture {
const size_t batch_size,
const int tolerance_percent) {
size_t kafka_out_data_len = 0;
constexpr size_t kafka_packet_overhead = 62;
ch::steady_clock::time_point start;
size_t total_size{};
ch::milliseconds throttle_time{};
Expand All @@ -334,7 +362,7 @@ struct throughput_limits_fixure : prod_consume_fixture {
// to fetch. We only can consume almost as much as have been produced:
const auto kafka_data_cap = kafka_data_available - batch_size * 2;
for (int k = -warmup_cycles(
rate_limit_out, batch_size + kafka_packet_overhead);
rate_limit_out, batch_size + kafka_packet_eg_overhead);
kafka_out_data_len < kafka_data_cap;
++k) {
if (k == 0) {
Expand All @@ -356,7 +384,7 @@ struct throughput_limits_fixure : prod_consume_fixture {
.partitions[0]
.records.value()
.size_bytes();
total_size += kafka_data_len + kafka_packet_overhead;
total_size += kafka_data_len + kafka_packet_eg_overhead;
throttle_time += fetch_resp.data.throttle_time_ms;
kafka_out_data_len += kafka_data_len;
}
Expand Down Expand Up @@ -558,22 +586,9 @@ FIXTURE_TEST(test_quota_balancer_config_balancer_period, prod_consume_fixture) {
// TODO: move producer utilities somewhere else and give this test a proper
// home.
FIXTURE_TEST(test_offset_for_leader_epoch, prod_consume_fixture) {
producer = std::make_unique<kafka::client::transport>(
make_kafka_client().get0());
producer->connect().get0();
model::topic_namespace tp_ns(model::ns("kafka"), test_topic);
add_topic(tp_ns).get0();
model::ntp ntp(tp_ns.ns, tp_ns.tp, model::partition_id(0));
tests::cooperative_spin_wait_with_timeout(10s, [ntp, this] {
auto shard = app.shard_table.local().shard_for(ntp);
if (!shard) {
return ss::make_ready_future<bool>(false);
}
return app.partition_manager.invoke_on(
*shard, [ntp](cluster::partition_manager& pm) {
return pm.get(ntp)->is_leader();
});
}).get0();
wait_for_controller_leadership().get();
start();
model::ntp ntp(test_tp_ns.ns, test_tp_ns.tp, model::partition_id(0));
auto shard = app.shard_table.local().shard_for(ntp);
for (int i = 0; i < 3; i++) {
// Refresh leadership.
Expand Down Expand Up @@ -613,8 +628,7 @@ FIXTURE_TEST(test_offset_for_leader_epoch, prod_consume_fixture) {

// Make a request getting the offset from a term below the start of the
// log.
auto client = make_kafka_client().get0();
client.connect().get();
auto& client = consumers.front();
auto current_term = app.partition_manager
.invoke_on(
*shard,
Expand Down Expand Up @@ -659,9 +673,8 @@ FIXTURE_TEST(test_offset_for_leader_epoch, prod_consume_fixture) {
FIXTURE_TEST(test_basic_delete_around_batch, prod_consume_fixture) {
wait_for_controller_leadership().get0();
start();
const model::topic_namespace tp_ns(model::ns("kafka"), test_topic);
const model::partition_id pid(0);
const model::ntp ntp(tp_ns.ns, tp_ns.tp, pid);
const model::ntp ntp(test_tp_ns.ns, test_tp_ns.tp, pid);
auto partition = app.partition_manager.local().get(ntp);
auto log = partition->log();

Expand Down Expand Up @@ -796,8 +809,7 @@ FIXTURE_TEST(test_produce_bad_timestamps, prod_consume_fixture) {

wait_for_controller_leadership().get0();
start();
auto ntp = model::ntp(
model::ns("kafka"), test_topic, model::partition_id(0));
auto ntp = model::ntp(test_tp_ns.ns, test_tp_ns.tp, model::partition_id(0));

auto producer = tests::kafka_produce_transport(make_kafka_client().get());
producer.start().get();
Expand Down

0 comments on commit 20ca50a

Please sign in to comment.