From 8ebd27d60312378fae8bfd9829f2ccbbf8c65f3e Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Sun, 18 Feb 2024 20:51:03 +0000 Subject: [PATCH 1/5] ssx: Introduce sharded_ptr Signed-off-by: Ben Pope (cherry picked from commit bd424798c49f43916b08eb3d62d5617b8abd5607) Conflicts: src/v/ssx/sharded_ptr.h (header include) src/v/ssx/tests/sharded_ptr_test.cc (includes) --- src/v/ssx/sharded_ptr.h | 126 ++++++++++++++++++++++++++++ src/v/ssx/tests/CMakeLists.txt | 1 + src/v/ssx/tests/sharded_ptr_test.cc | 105 +++++++++++++++++++++++ 3 files changed, 232 insertions(+) create mode 100644 src/v/ssx/sharded_ptr.h create mode 100644 src/v/ssx/tests/sharded_ptr_test.cc diff --git a/src/v/ssx/sharded_ptr.h b/src/v/ssx/sharded_ptr.h new file mode 100644 index 0000000000000..40ceee5346688 --- /dev/null +++ b/src/v/ssx/sharded_ptr.h @@ -0,0 +1,126 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "seastarx.h" +#include "utils/mutex.h" +#include "vassert.h" + +#include +#include + +#include + +namespace ssx { + +/// A pointer that is safe to share between shards. +/// +/// The pointer can be reset only by the home shard; other shards shall not +/// observe the change until the update has run on their reactor. +/// +/// As such, it is safe to maintain a pointer or reference to the pointee an any +/// shard until the next yield point. +template +class sharded_ptr { +public: + sharded_ptr() + : _shard{ss::this_shard_id()} {} + ~sharded_ptr() noexcept = default; + + sharded_ptr(sharded_ptr&& other) noexcept = default; + sharded_ptr& operator=(sharded_ptr&&) noexcept = default; + + sharded_ptr(sharded_ptr const&) = delete; + sharded_ptr& operator=(sharded_ptr const&) = delete; + + /// dereferences pointer to the managed object for the local shard. + /// + /// reset must have been called at least once. + /// stop must not have been called. + T& operator*() const { return local().operator*(); } + + /// dereferences pointer to the managed object for the local shard. + /// + /// reset must have been called at least once. + /// stop must not have been called. + T* operator->() const { return local().operator->(); } + + /// checks if there is an associated managed object on the local shard. + /// + /// This is safe to call at any time on any shard. + explicit operator bool() const { + return _state.size() > ss::this_shard_id() && local() != nullptr; + } + + /// replaces the managed object. + /// + /// Must be called on the home shard and is safe to call consurrently. + ss::future<> reset(std::shared_ptr u = nullptr) { + assert_shard(); + auto mu{co_await _mutex.get_units()}; + if (_state.empty()) { + _state.resize(ss::smp::count); + } + + co_await ss::smp::invoke_on_all([this, u]() noexcept { local() = u; }); + } + + /// replaces the managed object by constructing a new one. + /// + /// Must be called on the home shard and is safe to call concurrently. + /// returns an ss::broken_semaphore if stop() has been called. + template + ss::future<> reset(Args&&... args) { + return reset(std::make_shared(std::forward(args)...)); + } + + /// stop managing any object. + /// + /// Must be called on the home shard and is safe to call concurrently. + /// returns an ss::broken_semaphore if stop() has been called. + ss::future<> stop() { + co_await _mutex.with([this] { _mutex.broken(); }); + _state = {}; + } + + /// return the home shard. + /// + /// This is safe to call at any time on any shard. + auto shard_id() const { return _shard; } + + /// get a reference to the local instance + /// + /// reset must have been called at least once. + /// stop must not have been called. + std::shared_ptr const& local() const { + return _state[ss::this_shard_id()]; + } + + /// get a reference to the local instance + /// + /// reset must have been called at least once. + /// stop must not have been called. + std::shared_ptr& local() { return _state[ss::this_shard_id()]; } + +private: + void assert_shard() const { + vassert( + ss::this_shard_id() == _shard, + "reset must be called on home shard: ", + _shard); + } + ss::shard_id _shard; + std::vector> _state; + mutex _mutex; +}; + +} // namespace ssx diff --git a/src/v/ssx/tests/CMakeLists.txt b/src/v/ssx/tests/CMakeLists.txt index b69109c6188e7..35e4bd14c9e81 100644 --- a/src/v/ssx/tests/CMakeLists.txt +++ b/src/v/ssx/tests/CMakeLists.txt @@ -30,6 +30,7 @@ rp_test( BINARY_NAME ssx_multi_thread SOURCES abort_source_test.cc + sharded_ptr_test.cc LIBRARIES v::seastar_testing_main ARGS "-- -c 2" LABELS ssx diff --git a/src/v/ssx/tests/sharded_ptr_test.cc b/src/v/ssx/tests/sharded_ptr_test.cc new file mode 100644 index 0000000000000..4392a2cbde50c --- /dev/null +++ b/src/v/ssx/tests/sharded_ptr_test.cc @@ -0,0 +1,105 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "seastarx.h" +#include "ssx/sharded_ptr.h" + +#include +#include +#include + +#include + +SEASTAR_THREAD_TEST_CASE(test_sharded_ptr_basic_ops) { + ssx::sharded_ptr p0; + BOOST_REQUIRE(!p0); + BOOST_REQUIRE_EQUAL(p0.shard_id(), ss::this_shard_id()); + + // Test operator bool (before reset) + for (auto i : boost::irange(0u, ss::smp::count)) { + ss::smp::submit_to(i, [&]() { BOOST_REQUIRE(!p0); }).get(); + } + + // Test reset + p0.reset(std::make_shared(43)).get(); + p0.reset(43).get(); + + // Test operator bool and deref (after reset) + for (auto i : boost::irange(0u, ss::smp::count)) { + ss::smp::submit_to(i, [&]() { + BOOST_REQUIRE(p0 && p0.operator*() == 43); + BOOST_REQUIRE(p0 && *p0.operator->() == 43); + }).get(); + } + + // Test operator bool (after stop) + p0.stop().get(); + for (auto i : boost::irange(0u, ss::smp::count)) { + ss::smp::submit_to(i, [&]() { BOOST_REQUIRE(!p0); }).get(); + } + + // Test reset (after stop) + try { + p0.reset().get(); + BOOST_FAIL("Expected exception"); + } catch (ss::broken_semaphore const&) { + // Success + } catch (...) { + BOOST_FAIL("Unexpected exception"); + } + + // Test stop (after stop) + try { + p0.stop().get(); + BOOST_FAIL("Expected exception"); + } catch (ss::broken_semaphore const&) { + // Success + } catch (...) { + BOOST_FAIL("Unexpected exception"); + } +} + +SEASTAR_THREAD_TEST_CASE(test_sharded_ptr_stop_without_reset) { + ssx::sharded_ptr p0; + p0.stop().get(); +} + +SEASTAR_THREAD_TEST_CASE(test_sharded_ptr_shared) { + ssx::sharded_ptr p0; + p0.reset(42).get(); + + std::shared_ptr shared = p0.local(); + std::weak_ptr weak = p0.local(); + BOOST_REQUIRE(p0 && *p0 == 42); + + p0.reset().get(); + BOOST_REQUIRE(shared.get() != nullptr); + BOOST_REQUIRE(weak.lock().get() != nullptr); + + shared.reset(); + BOOST_REQUIRE(shared.get() == nullptr); + BOOST_REQUIRE(weak.lock().get() == nullptr); +} + +SEASTAR_THREAD_TEST_CASE(test_sharded_ptr_move) { + ssx::sharded_ptr p0; + p0.reset(42).get(); + + std::shared_ptr shared = p0.local(); + + // Move construction + auto p1{std::move(p0)}; + BOOST_REQUIRE(shared && *shared == 42); + BOOST_REQUIRE(p1 && p1.local() && *p1 == 42); + + // Move assignment + p0 = std::move(p1); + BOOST_REQUIRE(shared && *shared == 42); + BOOST_REQUIRE(p0 && p0.local() && *p0 == 42); +} From 62e2b532e7dc9837f4e793baa0fe3e28643b6a62 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 31 Jan 2024 16:22:59 +0000 Subject: [PATCH 2/5] kafka/test: Allow multiple consumers, producers, and partitions Refactor only, no functional change. Signed-off-by: Ben Pope (cherry picked from commit 20ca50ab871fbe7e1b009fd4687221596bfd9ba7) --- .../server/tests/produce_consume_test.cc | 146 ++++++++++-------- 1 file changed, 79 insertions(+), 67 deletions(-) diff --git a/src/v/kafka/server/tests/produce_consume_test.cc b/src/v/kafka/server/tests/produce_consume_test.cc index b599dc52b1cce..d49e368d717ba 100644 --- a/src/v/kafka/server/tests/produce_consume_test.cc +++ b/src/v/kafka/server/tests/produce_consume_test.cc @@ -33,26 +33,39 @@ using std::vector; using tests::kv_t; struct prod_consume_fixture : public redpanda_thread_fixture { - void start() { - consumer = std::make_unique( - make_kafka_client().get0()); - producer = std::make_unique( - make_kafka_client().get0()); - consumer->connect().get0(); - producer->connect().get0(); - model::topic_namespace tp_ns(model::ns("kafka"), test_topic); - add_topic(tp_ns).get0(); - model::ntp ntp(tp_ns.ns, tp_ns.tp, model::partition_id(0)); - tests::cooperative_spin_wait_with_timeout(2s, [ntp, this] { - auto shard = app.shard_table.local().shard_for(ntp); - if (!shard) { - return ss::make_ready_future(false); - } - return app.partition_manager.invoke_on( - *shard, [ntp](cluster::partition_manager& pm) { - return pm.get(ntp)->is_leader(); - }); - }).get0(); + void start(unsigned int count = 1) { + producers.reserve(count); + consumers.reserve(count); + fetch_offsets.resize(count, model::offset{0}); + + add_topic(test_tp_ns, static_cast(count)).get(); + + ss::parallel_for_each(boost::irange(0u, count), [&](auto i) { + consumers.emplace_back(make_kafka_client().get()); + auto& consumer = consumers.back(); + + producers.emplace_back(make_kafka_client().get()); + auto& producer = producers.back(); + + model::ntp ntp( + test_tp_ns.ns, test_tp_ns.tp, model::partition_id(i)); + return ss::when_all_succeed( + producer.connect(), + consumer.connect(), + tests::cooperative_spin_wait_with_timeout( + 2s, + [ntp, this] { + auto shard = app.shard_table.local().shard_for(ntp); + if (!shard) { + return ss::make_ready_future(false); + } + return app.partition_manager.invoke_on( + *shard, [ntp](cluster::partition_manager& pm) { + return pm.get(ntp)->is_leader(); + }); + })) + .discard_result(); + }).get(); } std::vector small_batches(size_t count) { @@ -74,8 +87,9 @@ struct prod_consume_fixture : public redpanda_thread_fixture { return res; } - ss::future - produce_raw(std::vector&& partitions) { + ss::future produce_raw( + kafka::client::transport& producer, + std::vector&& partitions) { kafka::produce_request::topic tp; tp.partitions = std::move(partitions); tp.name = test_topic; @@ -85,7 +99,12 @@ struct prod_consume_fixture : public redpanda_thread_fixture { req.data.timeout_ms = std::chrono::seconds(2); req.has_idempotent = false; req.has_transactional = false; - return producer->dispatch(std::move(req)); + return producer.dispatch(std::move(req)); + } + + ss::future + produce_raw(std::vector&& partitions) { + return produce_raw(producers.front(), std::move(partitions)); } template @@ -98,10 +117,11 @@ struct prod_consume_fixture : public redpanda_thread_fixture { }); } - ss::future fetch_next() { + ss::future + fetch_next(kafka::client::transport& consumer, model::partition_id p_id) { kafka::fetch_request::partition partition; - partition.fetch_offset = fetch_offset; - partition.partition_index = model::partition_id(0); + partition.fetch_offset = fetch_offsets[p_id()]; + partition.partition_index = p_id; partition.log_start_offset = model::offset(0); partition.max_bytes = 1_MiB; kafka::fetch_request::topic topic; @@ -114,8 +134,8 @@ struct prod_consume_fixture : public redpanda_thread_fixture { req.data.max_wait_ms = 1000ms; req.data.topics.push_back(std::move(topic)); - return consumer->dispatch(std::move(req), kafka::api_version(4)) - .then([this](kafka::fetch_response resp) { + return consumer.dispatch(std::move(req), kafka::api_version(4)) + .then([this, p_id](kafka::fetch_response resp) { if (resp.data.topics.empty()) { return resp; } @@ -125,18 +145,24 @@ struct prod_consume_fixture : public redpanda_thread_fixture { const auto& data = part.partitions.begin()->records; if (data && !data->empty()) { // update next fetch offset the same way as Kafka clients - fetch_offset = ++data->last_offset(); + fetch_offsets[p_id()] = ++data->last_offset(); } } return resp; }); } - model::offset fetch_offset{0}; - std::unique_ptr consumer; - std::unique_ptr producer; + ss::future fetch_next() { + return fetch_next(consumers.front(), model::partition_id{0}); + } + + std::vector fetch_offsets; + std::vector consumers; + std::vector producers; ss::abort_source as; - const model::topic test_topic = model::topic("test-topic"); + const model::topic_namespace test_tp_ns = { + model::ns("kafka"), model::topic("test-topic")}; + const model::topic& test_topic = test_tp_ns.tp; }; /** @@ -184,8 +210,8 @@ FIXTURE_TEST(test_version_handler, prod_consume_fixture) { const auto unsupported_version = kafka::api_version( kafka::produce_handler::max_supported() + 1); BOOST_CHECK_THROW( - producer - ->dispatch( + producers.front() + .dispatch( // NOLINTNEXTLINE(bugprone-use-after-move) kafka::produce_request(std::nullopt, 1, std::move(topics)), unsupported_version) @@ -194,7 +220,7 @@ FIXTURE_TEST(test_version_handler, prod_consume_fixture) { } static std::vector -single_batch(const size_t volume) { +single_batch(model::partition_id p_id, const size_t volume) { storage::record_batch_builder builder( model::record_batch_type::raft_data, model::offset(0)); { @@ -205,7 +231,7 @@ single_batch(const size_t volume) { } kafka::produce_request::partition partition; - partition.partition_index = model::partition_id(0); + partition.partition_index = p_id; partition.records.emplace(std::move(builder).build()); std::vector res; @@ -216,6 +242,9 @@ single_batch(const size_t volume) { namespace ch = std::chrono; struct throughput_limits_fixure : prod_consume_fixture { + static constexpr size_t kafka_packet_in_overhead = 127; + static constexpr size_t kafka_packet_eg_overhead = 62; + ch::milliseconds _window_width; ch::milliseconds _balancer_period; int64_t _rate_minimum; @@ -273,18 +302,17 @@ struct throughput_limits_fixure : prod_consume_fixture { const size_t batch_size, const int tolerance_percent) { size_t kafka_in_data_len = 0; - constexpr size_t kafka_packet_overhead = 127; // do not divide rate by smp::count because // - balanced case: TP will be balanced and the entire quota will end // up in one shard // - static case: rate_limit is per shard const auto batches_cnt = /* 1s * */ rate_limit_in - / (batch_size + kafka_packet_overhead); + / (batch_size + kafka_packet_in_overhead); ch::steady_clock::time_point start; ch::milliseconds throttle_time{}; for (int k = -warmup_cycles( - rate_limit_in, batch_size + kafka_packet_overhead); + rate_limit_in, batch_size + kafka_packet_in_overhead); k != batches_cnt; ++k) { if (k == 0) { @@ -294,7 +322,8 @@ struct throughput_limits_fixure : prod_consume_fixture { false, "Ingress measurement starts. batches: " << batches_cnt); } - throttle_time += produce_raw(single_batch(batch_size)) + throttle_time += produce_raw( + single_batch(model::partition_id{0}, batch_size)) .then([](const kafka::produce_response& r) { return r.data.throttle_time_ms; }) @@ -302,7 +331,7 @@ struct throughput_limits_fixure : prod_consume_fixture { kafka_in_data_len += batch_size; } const auto stop = ch::steady_clock::now(); - const auto wire_data_length = (batch_size + kafka_packet_overhead) + const auto wire_data_length = (batch_size + kafka_packet_in_overhead) * batches_cnt; const auto rate_estimated = rate_limit_in - _rate_minimum * (ss::smp::count - 1); @@ -324,7 +353,6 @@ struct throughput_limits_fixure : prod_consume_fixture { const size_t batch_size, const int tolerance_percent) { size_t kafka_out_data_len = 0; - constexpr size_t kafka_packet_overhead = 62; ch::steady_clock::time_point start; size_t total_size{}; ch::milliseconds throttle_time{}; @@ -334,7 +362,7 @@ struct throughput_limits_fixure : prod_consume_fixture { // to fetch. We only can consume almost as much as have been produced: const auto kafka_data_cap = kafka_data_available - batch_size * 2; for (int k = -warmup_cycles( - rate_limit_out, batch_size + kafka_packet_overhead); + rate_limit_out, batch_size + kafka_packet_eg_overhead); kafka_out_data_len < kafka_data_cap; ++k) { if (k == 0) { @@ -356,7 +384,7 @@ struct throughput_limits_fixure : prod_consume_fixture { .partitions[0] .records.value() .size_bytes(); - total_size += kafka_data_len + kafka_packet_overhead; + total_size += kafka_data_len + kafka_packet_eg_overhead; throttle_time += fetch_resp.data.throttle_time_ms; kafka_out_data_len += kafka_data_len; } @@ -558,22 +586,9 @@ FIXTURE_TEST(test_quota_balancer_config_balancer_period, prod_consume_fixture) { // TODO: move producer utilities somewhere else and give this test a proper // home. FIXTURE_TEST(test_offset_for_leader_epoch, prod_consume_fixture) { - producer = std::make_unique( - make_kafka_client().get0()); - producer->connect().get0(); - model::topic_namespace tp_ns(model::ns("kafka"), test_topic); - add_topic(tp_ns).get0(); - model::ntp ntp(tp_ns.ns, tp_ns.tp, model::partition_id(0)); - tests::cooperative_spin_wait_with_timeout(10s, [ntp, this] { - auto shard = app.shard_table.local().shard_for(ntp); - if (!shard) { - return ss::make_ready_future(false); - } - return app.partition_manager.invoke_on( - *shard, [ntp](cluster::partition_manager& pm) { - return pm.get(ntp)->is_leader(); - }); - }).get0(); + wait_for_controller_leadership().get(); + start(); + model::ntp ntp(test_tp_ns.ns, test_tp_ns.tp, model::partition_id(0)); auto shard = app.shard_table.local().shard_for(ntp); for (int i = 0; i < 3; i++) { // Refresh leadership. @@ -613,8 +628,7 @@ FIXTURE_TEST(test_offset_for_leader_epoch, prod_consume_fixture) { // Make a request getting the offset from a term below the start of the // log. - auto client = make_kafka_client().get0(); - client.connect().get(); + auto& client = consumers.front(); auto current_term = app.partition_manager .invoke_on( *shard, @@ -659,9 +673,8 @@ FIXTURE_TEST(test_offset_for_leader_epoch, prod_consume_fixture) { FIXTURE_TEST(test_basic_delete_around_batch, prod_consume_fixture) { wait_for_controller_leadership().get0(); start(); - const model::topic_namespace tp_ns(model::ns("kafka"), test_topic); const model::partition_id pid(0); - const model::ntp ntp(tp_ns.ns, tp_ns.tp, pid); + const model::ntp ntp(test_tp_ns.ns, test_tp_ns.tp, pid); auto partition = app.partition_manager.local().get(ntp); auto log = partition->log(); @@ -796,8 +809,7 @@ FIXTURE_TEST(test_produce_bad_timestamps, prod_consume_fixture) { wait_for_controller_leadership().get0(); start(); - auto ntp = model::ntp( - model::ns("kafka"), test_topic, model::partition_id(0)); + auto ntp = model::ntp(test_tp_ns.ns, test_tp_ns.tp, model::partition_id(0)); auto producer = tests::kafka_produce_transport(make_kafka_client().get()); producer.start().get(); From a3d88bd32659686557265d736b549987e2826e89 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 21 Feb 2024 19:11:25 +0000 Subject: [PATCH 3/5] kafka/server/snc_quota_manager: Add metrics New metrics: * `traffic_egress` - mirrors `traffic_intake` * `throttle_time` - Histogram of throttle time requested Signed-off-by: Ben Pope (cherry picked from commit 679804c8f07c7a983a158d36d37a8680628cda3d) --- src/v/kafka/server/snc_quota_manager.cc | 18 +++++++++++++++++- src/v/kafka/server/snc_quota_manager.h | 20 +++++++++++++++++--- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/src/v/kafka/server/snc_quota_manager.cc b/src/v/kafka/server/snc_quota_manager.cc index e3de91405ba83..6cbc60c6aa0ce 100644 --- a/src/v/kafka/server/snc_quota_manager.cc +++ b/src/v/kafka/server/snc_quota_manager.cc @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -94,10 +95,21 @@ void snc_quotas_probe::setup_metrics() { } metric_defs.emplace_back(sm::make_counter( "traffic_intake", - _traffic_in, + _traffic.in, sm::description("Amount of Kafka traffic received from the clients " "that is taken into processing, in bytes"))); + metric_defs.emplace_back(sm::make_counter( + "traffic_egress", + _traffic.eg, + sm::description("Amount of Kafka traffic published to the clients " + "that was taken into processing, in bytes"))); + + metric_defs.emplace_back(sm::make_histogram( + "throttle_time", + [this] { return get_throttle_time(); }, + sm::description("Throttle time histogram (in seconds)"))); + _metrics.add_group( prometheus_sanitize::metrics_name("kafka:quotas"), metric_defs, @@ -308,6 +320,9 @@ snc_quota_manager::delays_t snc_quota_manager::get_shard_delays( std::max(eval_delay(_shard_quota.in), eval_delay(_shard_quota.eg))); ctx._throttled_until = now + res.request; + _probe.record_throttle_time( + std::chrono::duration_cast(res.request)); + return res; } @@ -337,6 +352,7 @@ void snc_quota_manager::record_response( return; } _shard_quota.eg.use(request_size, now); + _probe.rec_traffic_eg(request_size); } ss::lowres_clock::duration diff --git a/src/v/kafka/server/snc_quota_manager.h b/src/v/kafka/server/snc_quota_manager.h index 02428eaf161a5..f67579b7a5e1a 100644 --- a/src/v/kafka/server/snc_quota_manager.h +++ b/src/v/kafka/server/snc_quota_manager.h @@ -15,6 +15,7 @@ #include "metrics/metrics.h" #include "seastarx.h" #include "utils/bottomless_token_bucket.h" +#include "utils/log_hist.h" #include "utils/mutex.h" #include @@ -48,17 +49,30 @@ class snc_quotas_probe { ~snc_quotas_probe() noexcept = default; void rec_balancer_step() noexcept { ++_balancer_runs; } - void rec_traffic_in(const size_t bytes) noexcept { _traffic_in += bytes; } + void rec_traffic_in(const size_t bytes) noexcept { _traffic.in += bytes; } + void rec_traffic_eg(const size_t bytes) noexcept { _traffic.eg += bytes; } void setup_metrics(); uint64_t get_balancer_runs() const noexcept { return _balancer_runs; } + auto get_traffic_in() const { return _traffic.in; } + auto get_traffic_eg() const { return _traffic.eg; } + + auto record_throttle_time(std::chrono::microseconds t) { + return _throttle_time_us.record(t.count()); + } + + auto get_throttle_time() const { + return _throttle_time_us.public_histogram_logform(); + } + private: class snc_quota_manager& _qm; metrics::internal_metric_groups _metrics; uint64_t _balancer_runs = 0; - size_t _traffic_in = 0; + ingress_egress_state _traffic = {}; + log_hist_public _throttle_time_us; }; class snc_quota_context { @@ -224,7 +238,7 @@ class snc_quota_manager ingress_egress_state _shard_quota; // service - snc_quotas_probe _probe; + mutable snc_quotas_probe _probe; }; // Names exposed in this namespace are for unit test integration only From bdc014a3cc699f60c6e89e3e50135fa531a6886c Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 21 Feb 2024 20:34:29 +0000 Subject: [PATCH 4/5] kafka/server/snc_quota_manager: Small refactor Signed-off-by: Ben Pope (cherry picked from commit 55d8952d9dbe457e3da577eb608146d4c560ec59) --- src/v/kafka/server/snc_quota_manager.cc | 31 ++++++++++++++----------- src/v/kafka/server/snc_quota_manager.h | 1 + 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/v/kafka/server/snc_quota_manager.cc b/src/v/kafka/server/snc_quota_manager.cc index 6cbc60c6aa0ce..1548c4db73879 100644 --- a/src/v/kafka/server/snc_quota_manager.cc +++ b/src/v/kafka/server/snc_quota_manager.cc @@ -176,20 +176,8 @@ snc_quota_manager::snc_quota_manager() _shard_quota.in.set_width(v); _shard_quota.eg.set_width(v); }); - _kafka_quota_balancer_node_period.watch([this] { - if (_balancer_gate.is_closed()) { - return; - } - if (_balancer_timer.cancel()) { - // cancel() returns true only on the balancer shard - // because the timer is never armed on the others - arm_balancer_timer(); - } - // if the balancer is disabled, this is where the quotas are reset to - // default. This needs to be called on every shard because the effective - // balance is updated directly in this case. - update_node_quota_default(); - }); + _kafka_quota_balancer_node_period.watch( + [this]() { update_balance_config(); }); _kafka_quota_balancer_min_shard_throughput_ratio.watch( [this] { update_shard_quota_minimum(); }); _kafka_quota_balancer_min_shard_throughput_bps.watch( @@ -852,6 +840,21 @@ void snc_quota_manager::adjust_quota( vlog(klog.trace, "qm - Adjust quota: {} -> {}", delta, _shard_quota); } +void snc_quota_manager::update_balance_config() { + if (_balancer_gate.is_closed()) { + return; + } + if (_balancer_timer.cancel()) { + // cancel() returns true only on the balancer shard + // because the timer is never armed on the others + arm_balancer_timer(); + } + // if the balancer is disabled, this is where the quotas are reset to + // default. This needs to be called on every shard because the effective + // balance is updated directly in this case. + update_node_quota_default(); +} + } // namespace kafka struct parseless_formatter { diff --git a/src/v/kafka/server/snc_quota_manager.h b/src/v/kafka/server/snc_quota_manager.h index f67579b7a5e1a..c75e45c2c0d49 100644 --- a/src/v/kafka/server/snc_quota_manager.h +++ b/src/v/kafka/server/snc_quota_manager.h @@ -213,6 +213,7 @@ class snc_quota_manager void adjust_quota(const ingress_egress_state& delta) noexcept; private: + void update_balance_config(); // configuration config::binding _max_kafka_throttle_delay; ingress_egress_state>> From 1dee3d6a0ac0cf5aac281f75656b1dcdd8512c7a Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 21 Feb 2024 20:36:44 +0000 Subject: [PATCH 5/5] kafka/server: Introduce throughput throttling v2 When `kafka_throughput_throttling_v2` is enabled (true by default), use an `ss::internal::shared_token_bucket` for requests across all shards, instead of balancing quota between shards. Signed-off-by: Ben Pope (cherry picked from commit 6ddfb321cabb74d5b54357a0fec58b6c5d6e47fe) Conflicts: src/v/kafka/server/snc_quota_manager.h (header include) --- src/v/config/configuration.cc | 15 + src/v/config/configuration.h | 3 + src/v/kafka/server/snc_quota_manager.cc | 107 +++++- src/v/kafka/server/snc_quota_manager.h | 15 +- .../server/tests/produce_consume_test.cc | 354 ++++++++++++++++++ src/v/redpanda/application.cc | 2 +- src/v/redpanda/application.h | 2 + 7 files changed, 481 insertions(+), 17 deletions(-) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 78259c84d3bdf..5388006f46a7e 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -2777,6 +2777,21 @@ configuration::configuration() {.needs_restart = needs_restart::no, .visibility = visibility::user}, std::nullopt, {.min = 1}) + , kafka_throughput_throttling_v2( + *this, + "kafka_throughput_throttling_v2", + "Use throughput throttling based on a shared token bucket instead of " + "balancing quota between shards", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + true) + , kafka_throughput_replenish_threshold( + *this, + "kafka_throughput_replenish_threshold", + "Threshold for refilling the token bucket. Will be clamped between 1 and " + "kafka_throughput_limit_node_*_bps.", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + std::nullopt, + {.min = 1}) , kafka_quota_balancer_window( *this, "kafka_quota_balancer_window_ms", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 971f65ec4d545..d7ae1d8a852c2 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -517,6 +517,9 @@ struct configuration final : public config_store { bounded_property> kafka_throughput_limit_node_in_bps; bounded_property> kafka_throughput_limit_node_out_bps; + property kafka_throughput_throttling_v2; + bounded_property> + kafka_throughput_replenish_threshold; bounded_property kafka_quota_balancer_window; bounded_property kafka_quota_balancer_node_period; diff --git a/src/v/kafka/server/snc_quota_manager.cc b/src/v/kafka/server/snc_quota_manager.cc index 1548c4db73879..a4364e72e31b7 100644 --- a/src/v/kafka/server/snc_quota_manager.cc +++ b/src/v/kafka/server/snc_quota_manager.cc @@ -12,6 +12,8 @@ #include "config/configuration.h" #include "kafka/server/logger.h" #include "prometheus/prometheus_sanitize.h" +#include "ssx/future-util.h" +#include "ssx/sharded_ptr.h" #include "tristate.h" #include @@ -138,11 +140,29 @@ quota_t node_to_shard_quota(const std::optional node_quota) { } } +auto update_node_bucket( + ssx::sharded_ptr& b, + config::binding> const& cfg) { + if (!cfg().has_value()) { + return b.reset(); + } + uint64_t rate = *cfg(); + if (b && b->rate() == rate) { + return ss::make_ready_future(); + } + uint64_t limit = rate; + uint64_t threshold = config::shard_local_cfg() + .kafka_throughput_replenish_threshold() + .value_or(1); + return b.reset(rate, limit, threshold, false); +}; + } // namespace -snc_quota_manager::snc_quota_manager() +snc_quota_manager::snc_quota_manager(buckets_t& node_quota) : _max_kafka_throttle_delay( config::shard_local_cfg().max_kafka_throttle_delay_ms.bind()) + , _use_throttling_v2(config::shard_local_cfg().kafka_throughput_throttling_v2) , _kafka_throughput_limit_node_bps{ config::shard_local_cfg().kafka_throughput_limit_node_in_bps.bind(), config::shard_local_cfg().kafka_throughput_limit_node_out_bps.bind()} @@ -163,13 +183,29 @@ snc_quota_manager::snc_quota_manager() _kafka_quota_balancer_window()}, .eg {node_to_shard_quota(_node_quota_default.eg), _kafka_quota_balancer_window()}} + , _node_quota{node_quota} , _probe(*this) { update_shard_quota_minimum(); - _kafka_throughput_limit_node_bps.in.watch( - [this] { update_node_quota_default(); }); - _kafka_throughput_limit_node_bps.eg.watch( - [this] { update_node_quota_default(); }); + _use_throttling_v2.watch([this]() { update_balance_config(); }); + _kafka_throughput_limit_node_bps.in.watch([this] { + update_node_quota_default(); + if (ss::this_shard_id() == quota_balancer_shard) { + ssx::spawn_with_gate(_balancer_gate, [this] { + return update_node_bucket( + _node_quota.in, _kafka_throughput_limit_node_bps.in); + }); + } + }); + _kafka_throughput_limit_node_bps.eg.watch([this] { + update_node_quota_default(); + if (ss::this_shard_id() == quota_balancer_shard) { + ssx::spawn_with_gate(_balancer_gate, [this] { + return update_node_bucket( + _node_quota.eg, _kafka_throughput_limit_node_bps.eg); + }); + } + }); _kafka_quota_balancer_window.watch([this] { const auto v = _kafka_quota_balancer_window(); vlog(klog.debug, "qm - Set shard TP token bucket window: {}", v); @@ -199,16 +235,23 @@ ss::future<> snc_quota_manager::start() { if (ss::this_shard_id() == quota_balancer_shard) { _balancer_timer.arm( ss::lowres_clock::now() + get_quota_balancer_node_period()); + co_await update_node_bucket( + _node_quota.in, _kafka_throughput_limit_node_bps.in); + co_await update_node_bucket( + _node_quota.eg, _kafka_throughput_limit_node_bps.eg); } - return ss::make_ready_future<>(); } ss::future<> snc_quota_manager::stop() { if (ss::this_shard_id() == quota_balancer_shard) { _balancer_timer.cancel(); - return _balancer_gate.close(); - } else { - return ss::make_ready_future<>(); + co_await _balancer_gate.close(); + if (_node_quota.in) { + co_await _node_quota.in.stop(); + } + if (_node_quota.eg) { + co_await _node_quota.eg.stop(); + } } } @@ -229,6 +272,17 @@ delay_t eval_delay(const bottomless_token_bucket& tb) noexcept { return delay_t(muldiv(-tb.tokens(), delay_t::period::den, tb.quota())); } +/// Evaluate throttling delay required based on the state of a token bucket +delay_t eval_node_delay( + const ssx::sharded_ptr& tbp) noexcept { + if (!tbp) { + return delay_t::zero(); + } + auto& tb = *tbp; + return std::chrono::duration_cast( + tb.duration_for(tb.deficiency(tb.grab(0)))); +} + } // namespace ingress_egress_state> @@ -303,9 +357,16 @@ snc_quota_manager::delays_t snc_quota_manager::get_shard_delays( // throttling delay the connection should be requested to throttle // this time - res.request = std::min( - _max_kafka_throttle_delay(), - std::max(eval_delay(_shard_quota.in), eval_delay(_shard_quota.eg))); + if (_use_throttling_v2()) { + res.request = std::min( + _max_kafka_throttle_delay(), + std::max( + eval_node_delay(_node_quota.in), eval_node_delay(_node_quota.eg))); + } else { + res.request = std::min( + _max_kafka_throttle_delay(), + std::max(eval_delay(_shard_quota.in), eval_delay(_shard_quota.eg))); + } ctx._throttled_until = now + res.request; _probe.record_throttle_time( @@ -321,7 +382,14 @@ void snc_quota_manager::record_request_receive( if (ctx._exempt) { return; } - _shard_quota.in.use(request_size, now); + if (_use_throttling_v2()) { + if (_node_quota.in) { + _node_quota.in->replenish(now); + _node_quota.in->grab(request_size); + } + } else { + _shard_quota.in.use(request_size, now); + } } void snc_quota_manager::record_request_intake( @@ -339,7 +407,14 @@ void snc_quota_manager::record_response( if (ctx._exempt) { return; } - _shard_quota.eg.use(request_size, now); + if (_use_throttling_v2()) { + if (_node_quota.eg) { + _node_quota.eg->replenish(now); + _node_quota.eg->grab(request_size); + } + } else { + _shard_quota.eg.use(request_size, now); + } _probe.rec_traffic_eg(request_size); } @@ -844,6 +919,10 @@ void snc_quota_manager::update_balance_config() { if (_balancer_gate.is_closed()) { return; } + if (_use_throttling_v2()) { + _balancer_timer.cancel(); + return; + } if (_balancer_timer.cancel()) { // cancel() returns true only on the balancer shard // because the timer is never armed on the others diff --git a/src/v/kafka/server/snc_quota_manager.h b/src/v/kafka/server/snc_quota_manager.h index c75e45c2c0d49..b88ae43d27c74 100644 --- a/src/v/kafka/server/snc_quota_manager.h +++ b/src/v/kafka/server/snc_quota_manager.h @@ -14,6 +14,7 @@ #include "config/throughput_control_group.h" #include "metrics/metrics.h" #include "seastarx.h" +#include "ssx/sharded_ptr.h" #include "utils/bottomless_token_bucket.h" #include "utils/log_hist.h" #include "utils/mutex.h" @@ -23,6 +24,7 @@ #include #include #include +#include #include #include @@ -105,8 +107,15 @@ class snc_quota_manager public: using clock = ss::lowres_clock; using quota_t = bottomless_token_bucket::quota_t; - - snc_quota_manager(); + using bucket_t = ss::internal::shared_token_bucket< + uint64_t, + std::ratio<1>, + ss::internal::capped_release::no, + clock>; + using buckets_t = kafka::ingress_egress_state< + ssx::sharded_ptr>; + + explicit snc_quota_manager(buckets_t& node_quota); snc_quota_manager(const snc_quota_manager&) = delete; snc_quota_manager& operator=(const snc_quota_manager&) = delete; snc_quota_manager(snc_quota_manager&&) = delete; @@ -216,6 +225,7 @@ class snc_quota_manager void update_balance_config(); // configuration config::binding _max_kafka_throttle_delay; + config::binding _use_throttling_v2; ingress_egress_state>> _kafka_throughput_limit_node_bps; config::binding _kafka_quota_balancer_window; @@ -237,6 +247,7 @@ class snc_quota_manager ingress_egress_state> _node_quota_default; ingress_egress_state _shard_quota_minimum; ingress_egress_state _shard_quota; + buckets_t& _node_quota; // service mutable snc_quotas_probe _probe; diff --git a/src/v/kafka/server/tests/produce_consume_test.cc b/src/v/kafka/server/tests/produce_consume_test.cc index d49e368d717ba..c9f5061b5da03 100644 --- a/src/v/kafka/server/tests/produce_consume_test.cc +++ b/src/v/kafka/server/tests/produce_consume_test.cc @@ -24,7 +24,13 @@ #include "test_utils/async.h" #include "test_utils/fixture.h" +#include +#include +#include +#include + #include +#include #include @@ -241,7 +247,102 @@ single_batch(model::partition_id p_id, const size_t volume) { namespace ch = std::chrono; +namespace { + +enum class execution { seq, par }; + +/// Runs func in a thread on the given shard +template +auto async_submit_to(unsigned int shard, Func&& func) { + return ss::smp::submit_to(shard, [&func, shard]() { + return ss::async(std::forward(func), shard); + }); +} + +/// std::transform_reduce, but with an api that matches ss::map_reduce +/// +/// Mapper must be synchronous +template +auto transform_reduce( + Range&& rng, Mapper&& mapper, Initial&& initial, Reduce&& reduce) { + return std::transform_reduce( + rng.begin(), + rng.end(), + std::forward(initial), + std::forward(reduce), + std::forward(mapper)); +} + +template +auto map_reduce_thread_per_core( + Mapper&& mapper, Initial&& initial, Reduce&& reduce) { + return ss::map_reduce( + boost::irange(0u, ss::smp::count), + [&mapper](auto shard) { + return async_submit_to(shard, std::forward(mapper)); + }, + std::forward(initial), + std::forward(reduce)); +} + +template +auto transform_reduce_thread_per_core( + Mapper&& mapper, Initial&& initial, Reduce&& reduce) { + return transform_reduce( + boost::irange(0u, ss::smp::count), + [&mapper](auto shard) { + return async_submit_to(shard, std::forward(mapper)).get(); + }, + std::forward(initial), + std::forward(reduce)); +} + +/// Run mapper in a thread on each core, and then reduce on the original core, +/// returning a synchrous result +/// +/// execution::par - Run mapper on each core in parallel +/// execution::par - Run mapper on each core sequentially +template +auto transform_reduce_thread_per_core( + execution policy, Mapper&& mapper, Initial&& initial, Reduce&& reduce) { + switch (policy) { + case execution::seq: + return transform_reduce_thread_per_core( + std::forward(mapper), + std::forward(initial), + std::forward(reduce)); + case execution::par: + return map_reduce_thread_per_core( + std::forward(mapper), + std::forward(initial), + std::forward(reduce)) + .get(); + } +} + +/// Return a tuple of the result of applying BinaryOp element-wise to each tuple +template +struct tuple_binary_op { + auto operator()(auto&& t1, auto&& t2) const { + return std::apply( + [&](auto&&... args1) { + return std::apply( + [&](auto&&... args2) { + return std::make_tuple(BinaryOp{}( + std::forward(args1), + std::forward(args2))...); + }, + std::forward(t2)); + }, + std::forward(t1)); + } +}; + +} // namespace + struct throughput_limits_fixure : prod_consume_fixture { + using honour_throttle = ss::bool_class; + static constexpr size_t kafka_packet_in_overhead = 127; static constexpr size_t kafka_packet_eg_overhead = 62; @@ -402,13 +503,265 @@ struct throughput_limits_fixure : prod_consume_fixture { << (stop - start - time_estimated) * 100.0 / time_estimated << "%"); return kafka_out_data_len; } + + auto do_produce( + unsigned i, + size_t batches_cnt, + size_t batch_size, + honour_throttle honour_throttle) { + const model::partition_id p_id{i}; + size_t data_len{0}; + size_t total_len{0}; + ch::milliseconds total_throttle_time{}; + ch::milliseconds throttle_time{}; + for (size_t k{0}; k != batches_cnt; ++k) { + if (honour_throttle && throttle_time > ch::milliseconds::zero()) { + ss::sleep(throttle_time).get(); + } + auto res + = produce_raw(producers[i], single_batch(p_id, batch_size)).get(); + throttle_time = res.data.throttle_time_ms; + total_throttle_time += throttle_time; + data_len += batch_size; + total_len += batch_size + kafka_packet_in_overhead; + } + return std::make_tuple(data_len, total_len, total_throttle_time); + } + + auto do_consume( + unsigned int i, size_t data_cap, honour_throttle honour_throttle) { + const model::partition_id p_id{i}; + size_t data_len{0}; + size_t total_len{0}; + ch::milliseconds total_throttle_time{}; + ch::milliseconds throttle_time{}; + while (data_len < data_cap) { + if (honour_throttle && throttle_time > ch::milliseconds::zero()) { + ss::sleep(throttle_time).get(); + } + const auto fetch_resp = fetch_next(consumers[i], p_id).get(); + BOOST_REQUIRE_EQUAL(fetch_resp.data.topics.size(), 1); + BOOST_REQUIRE_EQUAL(fetch_resp.data.topics[0].partitions.size(), 1); + BOOST_TEST_REQUIRE( + fetch_resp.data.topics[0].partitions[0].records.has_value()); + const auto kafka_data_len = fetch_resp.data.topics[0] + .partitions[0] + .records.value() + .size_bytes(); + throttle_time = fetch_resp.data.throttle_time_ms; + total_throttle_time += throttle_time; + data_len += kafka_data_len; + total_len += kafka_data_len + kafka_packet_eg_overhead; + } + return std::make_tuple(data_len, total_len, total_throttle_time); + } + + auto get_recorded_traffic() { + return app.snc_quota_mgr + .map_reduce0( + [](kafka::snc_quota_manager& snc) { + return std::make_tuple( + snc.get_snc_quotas_probe().get_traffic_in(), + snc.get_snc_quotas_probe().get_traffic_eg()); + }, + std::make_tuple(size_t{0}, size_t{0}), + tuple_binary_op>{}) + .get(); + }; + + auto get_throttle_time() { + return app.snc_quota_mgr + .map_reduce0( + [](kafka::snc_quota_manager& snc) { + return snc.get_snc_quotas_probe().get_throttle_time(); + }, + ss::metrics::histogram{}, + std::plus{}) + .get(); + }; + + void test_throughput(honour_throttle honour_throttle, execution policy) { + using clock = kafka::snc_quota_manager::clock; + // configure + constexpr int64_t rate_limit_in = 9_KiB; + constexpr int64_t rate_limit_out = 7_KiB; + constexpr size_t batch_size = 256; + constexpr auto max_rate = std::max(rate_limit_in, rate_limit_out); + constexpr auto min_rate = std::min(rate_limit_in, rate_limit_out); + const double expected_max_throttle + = (double(batch_size) / min_rate) + * (policy == execution::seq ? 1 : ss::smp::count) + * (honour_throttle ? 1 : 2); + const size_t tolerance_percent = 8; + config_set("kafka_throughput_throttling_v2", true); + config_set( + "kafka_throughput_limit_node_in_bps", + std::make_optional(rate_limit_in)); + config_set( + "kafka_throughput_limit_node_out_bps", + std::make_optional(rate_limit_out)); + config_set("fetch_max_bytes", batch_size); + config_set("max_kafka_throttle_delay_ms", 30'000ms); + + wait_for_controller_leadership().get(); + start(ss::smp::count); + + // PRODUCE smaller batches for 5s per client + const auto batches_cnt = (5s).count() * rate_limit_in + / (batch_size + kafka_packet_in_overhead); + + constexpr auto now = []() { + clock::update(); + return clock::now(); + }; + + BOOST_TEST_MESSAGE("Warming up"); + ss::sleep( + produce_raw( + producers[0], single_batch(model::partition_id{0}, max_rate)) + .get() + .data.throttle_time_ms) + .get(); + + auto [init_recorded_in, init_recorded_eg] = get_recorded_traffic(); + + BOOST_TEST_MESSAGE("Producing"); + auto start_in = now(); + auto [kafka_in_data_len, kafka_in_total_len, throttle_time_in] + = transform_reduce_thread_per_core( + policy, + [&](auto i) { + return do_produce(i, batches_cnt, batch_size, honour_throttle); + }, + std::make_tuple(size_t{0}, size_t{0}, ch::milliseconds{0}), + tuple_binary_op>{}); + auto duration_in = now() - start_in; + auto [produce_recorded_in, produce_recorded_eg] + = tuple_binary_op>{}( + get_recorded_traffic(), + std::make_tuple(init_recorded_in, init_recorded_eg)); + + BOOST_TEST_MESSAGE("Finished Producing, Warming up"); + + // Consume the warmup batch and wait for throttle time + ss::sleep(fetch_next(consumers[0], model::partition_id{0}) + .get() + .data.throttle_time_ms) + .get(); + std::tie(init_recorded_in, init_recorded_eg) = get_recorded_traffic(); + + BOOST_TEST_MESSAGE("Consuming"); + // CONSUME + auto start_eg = now(); + auto [kafka_eg_data_len, kafka_eg_total_len, throttle_time_out] + = transform_reduce_thread_per_core( + policy, + [&](auto i) { + const model::partition_id p_id{i}; + const auto data_cap = (kafka_in_data_len / ss::smp::count) + - batch_size * 2; + return do_consume(i, data_cap, honour_throttle); + }, + std::make_tuple(size_t{0}, size_t{0}, ch::milliseconds{0}), + tuple_binary_op>{}); + auto duration_eg = now() - start_eg; + BOOST_TEST_MESSAGE("Finished Consuming"); + auto [consume_recorded_in, consume_recorded_eg] + = tuple_binary_op>{}( + get_recorded_traffic(), + std::make_tuple(init_recorded_in, init_recorded_eg)); + auto throttle_hist = get_throttle_time(); + + // otherwise test is not valid: + BOOST_REQUIRE_GT(kafka_in_data_len, kafka_eg_data_len); + + BOOST_CHECK_GT(produce_recorded_in, produce_recorded_eg); + BOOST_CHECK_LT(consume_recorded_in, consume_recorded_eg); + + BOOST_CHECK_EQUAL(produce_recorded_in, kafka_in_total_len); + BOOST_CHECK_EQUAL(consume_recorded_eg, kafka_eg_total_len); + + const auto time_estimated_in = std::chrono::milliseconds{ + kafka_in_total_len * 1000 / rate_limit_in}; + + const auto time_estimated_eg = std::chrono::milliseconds{ + kafka_eg_total_len * 1000 / rate_limit_out}; + + BOOST_TEST_CHECK( + abs(duration_in - time_estimated_in) + < time_estimated_in * tolerance_percent / 100, + "Total ingress time: stop-start[" + << duration_in << "] ≈ time_estimated[" << time_estimated_in + << "] ±" << tolerance_percent + << "%, error: " << std::setprecision(3) + << (duration_in - time_estimated_in) * 100.0 / time_estimated_in + << "%"); + + BOOST_TEST_CHECK( + abs(duration_eg - time_estimated_eg) + < time_estimated_eg * tolerance_percent / 100, + "Total egress time: stop-start[" + << duration_eg << "] ≈ time_estimated[" << time_estimated_eg + << "] ±" << tolerance_percent + << "%, error: " << std::setprecision(3) + << (duration_eg - time_estimated_eg) * 100.0 / time_estimated_eg + << "%"); + + constexpr auto get_throttle_percentile = + [](ss::metrics::histogram const& h, int p) { + auto count_p = double(h.sample_count) * p / 100; + auto lb = std::ranges::lower_bound( + h.buckets, + count_p, + std::less{}, + &ss::metrics::histogram_bucket::count); + if (lb != h.buckets.begin()) { + auto prev = std::prev(lb); + double ratio = (count_p - prev->count) + / (lb->count - prev->count); + return prev->upper_bound + + ratio * (lb->upper_bound - prev->upper_bound); + } + return lb->upper_bound; + }; + + const auto expected_bucket = std::ranges::lower_bound( + throttle_hist.buckets, + expected_max_throttle * 2, // add some leeway + std::less{}, + &ss::metrics::histogram_bucket::upper_bound); + + auto throttle_90 = get_throttle_percentile(throttle_hist, 90); + BOOST_TEST_CHECK(throttle_90 <= expected_bucket->upper_bound); + } }; +FIXTURE_TEST( + test_node_throughput_v2_limits_no_throttle_seq, throughput_limits_fixure) { + test_throughput(honour_throttle::no, execution::seq); +} + +FIXTURE_TEST( + test_node_throughput_v2_limits_with_throttle_seq, throughput_limits_fixure) { + test_throughput(honour_throttle::yes, execution::seq); +} + +FIXTURE_TEST( + test_node_throughput_v2_limits_no_throttle_par, throughput_limits_fixure) { + test_throughput(honour_throttle::no, execution::par); +} + +FIXTURE_TEST( + test_node_throughput_v2_limits_with_throttle_par, throughput_limits_fixure) { + test_throughput(honour_throttle::yes, execution::par); +} + FIXTURE_TEST(test_node_throughput_limits_static, throughput_limits_fixure) { // configure constexpr int64_t pershard_rate_limit_in = 9_KiB; constexpr int64_t pershard_rate_limit_out = 7_KiB; constexpr size_t batch_size = 256; + config_set("kafka_throughput_throttling_v2", false); config_set( "kafka_throughput_limit_node_in_bps", std::make_optional(pershard_rate_limit_in * ss::smp::count)); @@ -442,6 +795,7 @@ FIXTURE_TEST(test_node_throughput_limits_balanced, throughput_limits_fixure) { constexpr int64_t rate_limit_in = 9_KiB; constexpr int64_t rate_limit_out = 7_KiB; constexpr size_t batch_size = 256; + config_set("kafka_throughput_throttling_v2", false); config_set( "kafka_throughput_limit_node_in_bps", std::make_optional(rate_limit_in)); config_set( diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index cc089f6c09483..a49ab7042f0e6 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -1586,7 +1586,7 @@ void application::wire_up_redpanda_services( // metrics and quota management syschecks::systemd_message("Adding kafka quota managers").get(); construct_service(quota_mgr).get(); - construct_service(snc_quota_mgr).get(); + construct_service(snc_quota_mgr, std::ref(snc_node_quota)).get(); syschecks::systemd_message("Creating auditing subsystem").get(); construct_service( diff --git a/src/v/redpanda/application.h b/src/v/redpanda/application.h index bc5f9307aaa0b..68d2824e3204d 100644 --- a/src/v/redpanda/application.h +++ b/src/v/redpanda/application.h @@ -29,6 +29,7 @@ #include "kafka/client/configuration.h" #include "kafka/client/fwd.h" #include "kafka/server/fwd.h" +#include "kafka/server/snc_quota_manager.h" #include "metrics/aggregate_metrics_watcher.h" #include "metrics/metrics.h" #include "net/conn_quota.h" @@ -152,6 +153,7 @@ class application { ss::sharded coordinator_ntp_mapper; ss::sharded group_router; ss::sharded quota_mgr; + kafka::snc_quota_manager::buckets_t snc_node_quota; ss::sharded snc_quota_mgr; ss::sharded rm_group_frontend; ss::sharded usage_manager;