From 20ca50ab871fbe7e1b009fd4687221596bfd9ba7 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 31 Jan 2024 16:22:59 +0000 Subject: [PATCH] kafka/test: Allow multiple consumers, producers, and partitions Refactor only, no functional change. Signed-off-by: Ben Pope --- .../server/tests/produce_consume_test.cc | 146 ++++++++++-------- 1 file changed, 79 insertions(+), 67 deletions(-) diff --git a/src/v/kafka/server/tests/produce_consume_test.cc b/src/v/kafka/server/tests/produce_consume_test.cc index b599dc52b1cce..d49e368d717ba 100644 --- a/src/v/kafka/server/tests/produce_consume_test.cc +++ b/src/v/kafka/server/tests/produce_consume_test.cc @@ -33,26 +33,39 @@ using std::vector; using tests::kv_t; struct prod_consume_fixture : public redpanda_thread_fixture { - void start() { - consumer = std::make_unique( - make_kafka_client().get0()); - producer = std::make_unique( - make_kafka_client().get0()); - consumer->connect().get0(); - producer->connect().get0(); - model::topic_namespace tp_ns(model::ns("kafka"), test_topic); - add_topic(tp_ns).get0(); - model::ntp ntp(tp_ns.ns, tp_ns.tp, model::partition_id(0)); - tests::cooperative_spin_wait_with_timeout(2s, [ntp, this] { - auto shard = app.shard_table.local().shard_for(ntp); - if (!shard) { - return ss::make_ready_future(false); - } - return app.partition_manager.invoke_on( - *shard, [ntp](cluster::partition_manager& pm) { - return pm.get(ntp)->is_leader(); - }); - }).get0(); + void start(unsigned int count = 1) { + producers.reserve(count); + consumers.reserve(count); + fetch_offsets.resize(count, model::offset{0}); + + add_topic(test_tp_ns, static_cast(count)).get(); + + ss::parallel_for_each(boost::irange(0u, count), [&](auto i) { + consumers.emplace_back(make_kafka_client().get()); + auto& consumer = consumers.back(); + + producers.emplace_back(make_kafka_client().get()); + auto& producer = producers.back(); + + model::ntp ntp( + test_tp_ns.ns, test_tp_ns.tp, model::partition_id(i)); + return ss::when_all_succeed( + producer.connect(), + consumer.connect(), + tests::cooperative_spin_wait_with_timeout( + 2s, + [ntp, this] { + auto shard = app.shard_table.local().shard_for(ntp); + if (!shard) { + return ss::make_ready_future(false); + } + return app.partition_manager.invoke_on( + *shard, [ntp](cluster::partition_manager& pm) { + return pm.get(ntp)->is_leader(); + }); + })) + .discard_result(); + }).get(); } std::vector small_batches(size_t count) { @@ -74,8 +87,9 @@ struct prod_consume_fixture : public redpanda_thread_fixture { return res; } - ss::future - produce_raw(std::vector&& partitions) { + ss::future produce_raw( + kafka::client::transport& producer, + std::vector&& partitions) { kafka::produce_request::topic tp; tp.partitions = std::move(partitions); tp.name = test_topic; @@ -85,7 +99,12 @@ struct prod_consume_fixture : public redpanda_thread_fixture { req.data.timeout_ms = std::chrono::seconds(2); req.has_idempotent = false; req.has_transactional = false; - return producer->dispatch(std::move(req)); + return producer.dispatch(std::move(req)); + } + + ss::future + produce_raw(std::vector&& partitions) { + return produce_raw(producers.front(), std::move(partitions)); } template @@ -98,10 +117,11 @@ struct prod_consume_fixture : public redpanda_thread_fixture { }); } - ss::future fetch_next() { + ss::future + fetch_next(kafka::client::transport& consumer, model::partition_id p_id) { kafka::fetch_request::partition partition; - partition.fetch_offset = fetch_offset; - partition.partition_index = model::partition_id(0); + partition.fetch_offset = fetch_offsets[p_id()]; + partition.partition_index = p_id; partition.log_start_offset = model::offset(0); partition.max_bytes = 1_MiB; kafka::fetch_request::topic topic; @@ -114,8 +134,8 @@ struct prod_consume_fixture : public redpanda_thread_fixture { req.data.max_wait_ms = 1000ms; req.data.topics.push_back(std::move(topic)); - return consumer->dispatch(std::move(req), kafka::api_version(4)) - .then([this](kafka::fetch_response resp) { + return consumer.dispatch(std::move(req), kafka::api_version(4)) + .then([this, p_id](kafka::fetch_response resp) { if (resp.data.topics.empty()) { return resp; } @@ -125,18 +145,24 @@ struct prod_consume_fixture : public redpanda_thread_fixture { const auto& data = part.partitions.begin()->records; if (data && !data->empty()) { // update next fetch offset the same way as Kafka clients - fetch_offset = ++data->last_offset(); + fetch_offsets[p_id()] = ++data->last_offset(); } } return resp; }); } - model::offset fetch_offset{0}; - std::unique_ptr consumer; - std::unique_ptr producer; + ss::future fetch_next() { + return fetch_next(consumers.front(), model::partition_id{0}); + } + + std::vector fetch_offsets; + std::vector consumers; + std::vector producers; ss::abort_source as; - const model::topic test_topic = model::topic("test-topic"); + const model::topic_namespace test_tp_ns = { + model::ns("kafka"), model::topic("test-topic")}; + const model::topic& test_topic = test_tp_ns.tp; }; /** @@ -184,8 +210,8 @@ FIXTURE_TEST(test_version_handler, prod_consume_fixture) { const auto unsupported_version = kafka::api_version( kafka::produce_handler::max_supported() + 1); BOOST_CHECK_THROW( - producer - ->dispatch( + producers.front() + .dispatch( // NOLINTNEXTLINE(bugprone-use-after-move) kafka::produce_request(std::nullopt, 1, std::move(topics)), unsupported_version) @@ -194,7 +220,7 @@ FIXTURE_TEST(test_version_handler, prod_consume_fixture) { } static std::vector -single_batch(const size_t volume) { +single_batch(model::partition_id p_id, const size_t volume) { storage::record_batch_builder builder( model::record_batch_type::raft_data, model::offset(0)); { @@ -205,7 +231,7 @@ single_batch(const size_t volume) { } kafka::produce_request::partition partition; - partition.partition_index = model::partition_id(0); + partition.partition_index = p_id; partition.records.emplace(std::move(builder).build()); std::vector res; @@ -216,6 +242,9 @@ single_batch(const size_t volume) { namespace ch = std::chrono; struct throughput_limits_fixure : prod_consume_fixture { + static constexpr size_t kafka_packet_in_overhead = 127; + static constexpr size_t kafka_packet_eg_overhead = 62; + ch::milliseconds _window_width; ch::milliseconds _balancer_period; int64_t _rate_minimum; @@ -273,18 +302,17 @@ struct throughput_limits_fixure : prod_consume_fixture { const size_t batch_size, const int tolerance_percent) { size_t kafka_in_data_len = 0; - constexpr size_t kafka_packet_overhead = 127; // do not divide rate by smp::count because // - balanced case: TP will be balanced and the entire quota will end // up in one shard // - static case: rate_limit is per shard const auto batches_cnt = /* 1s * */ rate_limit_in - / (batch_size + kafka_packet_overhead); + / (batch_size + kafka_packet_in_overhead); ch::steady_clock::time_point start; ch::milliseconds throttle_time{}; for (int k = -warmup_cycles( - rate_limit_in, batch_size + kafka_packet_overhead); + rate_limit_in, batch_size + kafka_packet_in_overhead); k != batches_cnt; ++k) { if (k == 0) { @@ -294,7 +322,8 @@ struct throughput_limits_fixure : prod_consume_fixture { false, "Ingress measurement starts. batches: " << batches_cnt); } - throttle_time += produce_raw(single_batch(batch_size)) + throttle_time += produce_raw( + single_batch(model::partition_id{0}, batch_size)) .then([](const kafka::produce_response& r) { return r.data.throttle_time_ms; }) @@ -302,7 +331,7 @@ struct throughput_limits_fixure : prod_consume_fixture { kafka_in_data_len += batch_size; } const auto stop = ch::steady_clock::now(); - const auto wire_data_length = (batch_size + kafka_packet_overhead) + const auto wire_data_length = (batch_size + kafka_packet_in_overhead) * batches_cnt; const auto rate_estimated = rate_limit_in - _rate_minimum * (ss::smp::count - 1); @@ -324,7 +353,6 @@ struct throughput_limits_fixure : prod_consume_fixture { const size_t batch_size, const int tolerance_percent) { size_t kafka_out_data_len = 0; - constexpr size_t kafka_packet_overhead = 62; ch::steady_clock::time_point start; size_t total_size{}; ch::milliseconds throttle_time{}; @@ -334,7 +362,7 @@ struct throughput_limits_fixure : prod_consume_fixture { // to fetch. We only can consume almost as much as have been produced: const auto kafka_data_cap = kafka_data_available - batch_size * 2; for (int k = -warmup_cycles( - rate_limit_out, batch_size + kafka_packet_overhead); + rate_limit_out, batch_size + kafka_packet_eg_overhead); kafka_out_data_len < kafka_data_cap; ++k) { if (k == 0) { @@ -356,7 +384,7 @@ struct throughput_limits_fixure : prod_consume_fixture { .partitions[0] .records.value() .size_bytes(); - total_size += kafka_data_len + kafka_packet_overhead; + total_size += kafka_data_len + kafka_packet_eg_overhead; throttle_time += fetch_resp.data.throttle_time_ms; kafka_out_data_len += kafka_data_len; } @@ -558,22 +586,9 @@ FIXTURE_TEST(test_quota_balancer_config_balancer_period, prod_consume_fixture) { // TODO: move producer utilities somewhere else and give this test a proper // home. FIXTURE_TEST(test_offset_for_leader_epoch, prod_consume_fixture) { - producer = std::make_unique( - make_kafka_client().get0()); - producer->connect().get0(); - model::topic_namespace tp_ns(model::ns("kafka"), test_topic); - add_topic(tp_ns).get0(); - model::ntp ntp(tp_ns.ns, tp_ns.tp, model::partition_id(0)); - tests::cooperative_spin_wait_with_timeout(10s, [ntp, this] { - auto shard = app.shard_table.local().shard_for(ntp); - if (!shard) { - return ss::make_ready_future(false); - } - return app.partition_manager.invoke_on( - *shard, [ntp](cluster::partition_manager& pm) { - return pm.get(ntp)->is_leader(); - }); - }).get0(); + wait_for_controller_leadership().get(); + start(); + model::ntp ntp(test_tp_ns.ns, test_tp_ns.tp, model::partition_id(0)); auto shard = app.shard_table.local().shard_for(ntp); for (int i = 0; i < 3; i++) { // Refresh leadership. @@ -613,8 +628,7 @@ FIXTURE_TEST(test_offset_for_leader_epoch, prod_consume_fixture) { // Make a request getting the offset from a term below the start of the // log. - auto client = make_kafka_client().get0(); - client.connect().get(); + auto& client = consumers.front(); auto current_term = app.partition_manager .invoke_on( *shard, @@ -659,9 +673,8 @@ FIXTURE_TEST(test_offset_for_leader_epoch, prod_consume_fixture) { FIXTURE_TEST(test_basic_delete_around_batch, prod_consume_fixture) { wait_for_controller_leadership().get0(); start(); - const model::topic_namespace tp_ns(model::ns("kafka"), test_topic); const model::partition_id pid(0); - const model::ntp ntp(tp_ns.ns, tp_ns.tp, pid); + const model::ntp ntp(test_tp_ns.ns, test_tp_ns.tp, pid); auto partition = app.partition_manager.local().get(ntp); auto log = partition->log(); @@ -796,8 +809,7 @@ FIXTURE_TEST(test_produce_bad_timestamps, prod_consume_fixture) { wait_for_controller_leadership().get0(); start(); - auto ntp = model::ntp( - model::ns("kafka"), test_topic, model::partition_id(0)); + auto ntp = model::ntp(test_tp_ns.ns, test_tp_ns.tp, model::partition_id(0)); auto producer = tests::kafka_produce_transport(make_kafka_client().get()); producer.start().get();