diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 78259c84d3bdf..5388006f46a7e 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -2777,6 +2777,21 @@ configuration::configuration() {.needs_restart = needs_restart::no, .visibility = visibility::user}, std::nullopt, {.min = 1}) + , kafka_throughput_throttling_v2( + *this, + "kafka_throughput_throttling_v2", + "Use throughput throttling based on a shared token bucket instead of " + "balancing quota between shards", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + true) + , kafka_throughput_replenish_threshold( + *this, + "kafka_throughput_replenish_threshold", + "Threshold for refilling the token bucket. Will be clamped between 1 and " + "kafka_throughput_limit_node_*_bps.", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + std::nullopt, + {.min = 1}) , kafka_quota_balancer_window( *this, "kafka_quota_balancer_window_ms", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 971f65ec4d545..d7ae1d8a852c2 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -517,6 +517,9 @@ struct configuration final : public config_store { bounded_property> kafka_throughput_limit_node_in_bps; bounded_property> kafka_throughput_limit_node_out_bps; + property kafka_throughput_throttling_v2; + bounded_property> + kafka_throughput_replenish_threshold; bounded_property kafka_quota_balancer_window; bounded_property kafka_quota_balancer_node_period; diff --git a/src/v/kafka/server/snc_quota_manager.cc b/src/v/kafka/server/snc_quota_manager.cc index 1548c4db73879..a4364e72e31b7 100644 --- a/src/v/kafka/server/snc_quota_manager.cc +++ b/src/v/kafka/server/snc_quota_manager.cc @@ -12,6 +12,8 @@ #include "config/configuration.h" #include "kafka/server/logger.h" #include "prometheus/prometheus_sanitize.h" +#include "ssx/future-util.h" +#include "ssx/sharded_ptr.h" #include "tristate.h" #include @@ -138,11 +140,29 @@ quota_t node_to_shard_quota(const std::optional node_quota) { } } +auto update_node_bucket( + ssx::sharded_ptr& b, + config::binding> const& cfg) { + if (!cfg().has_value()) { + return b.reset(); + } + uint64_t rate = *cfg(); + if (b && b->rate() == rate) { + return ss::make_ready_future(); + } + uint64_t limit = rate; + uint64_t threshold = config::shard_local_cfg() + .kafka_throughput_replenish_threshold() + .value_or(1); + return b.reset(rate, limit, threshold, false); +}; + } // namespace -snc_quota_manager::snc_quota_manager() +snc_quota_manager::snc_quota_manager(buckets_t& node_quota) : _max_kafka_throttle_delay( config::shard_local_cfg().max_kafka_throttle_delay_ms.bind()) + , _use_throttling_v2(config::shard_local_cfg().kafka_throughput_throttling_v2) , _kafka_throughput_limit_node_bps{ config::shard_local_cfg().kafka_throughput_limit_node_in_bps.bind(), config::shard_local_cfg().kafka_throughput_limit_node_out_bps.bind()} @@ -163,13 +183,29 @@ snc_quota_manager::snc_quota_manager() _kafka_quota_balancer_window()}, .eg {node_to_shard_quota(_node_quota_default.eg), _kafka_quota_balancer_window()}} + , _node_quota{node_quota} , _probe(*this) { update_shard_quota_minimum(); - _kafka_throughput_limit_node_bps.in.watch( - [this] { update_node_quota_default(); }); - _kafka_throughput_limit_node_bps.eg.watch( - [this] { update_node_quota_default(); }); + _use_throttling_v2.watch([this]() { update_balance_config(); }); + _kafka_throughput_limit_node_bps.in.watch([this] { + update_node_quota_default(); + if (ss::this_shard_id() == quota_balancer_shard) { + ssx::spawn_with_gate(_balancer_gate, [this] { + return update_node_bucket( + _node_quota.in, _kafka_throughput_limit_node_bps.in); + }); + } + }); + _kafka_throughput_limit_node_bps.eg.watch([this] { + update_node_quota_default(); + if (ss::this_shard_id() == quota_balancer_shard) { + ssx::spawn_with_gate(_balancer_gate, [this] { + return update_node_bucket( + _node_quota.eg, _kafka_throughput_limit_node_bps.eg); + }); + } + }); _kafka_quota_balancer_window.watch([this] { const auto v = _kafka_quota_balancer_window(); vlog(klog.debug, "qm - Set shard TP token bucket window: {}", v); @@ -199,16 +235,23 @@ ss::future<> snc_quota_manager::start() { if (ss::this_shard_id() == quota_balancer_shard) { _balancer_timer.arm( ss::lowres_clock::now() + get_quota_balancer_node_period()); + co_await update_node_bucket( + _node_quota.in, _kafka_throughput_limit_node_bps.in); + co_await update_node_bucket( + _node_quota.eg, _kafka_throughput_limit_node_bps.eg); } - return ss::make_ready_future<>(); } ss::future<> snc_quota_manager::stop() { if (ss::this_shard_id() == quota_balancer_shard) { _balancer_timer.cancel(); - return _balancer_gate.close(); - } else { - return ss::make_ready_future<>(); + co_await _balancer_gate.close(); + if (_node_quota.in) { + co_await _node_quota.in.stop(); + } + if (_node_quota.eg) { + co_await _node_quota.eg.stop(); + } } } @@ -229,6 +272,17 @@ delay_t eval_delay(const bottomless_token_bucket& tb) noexcept { return delay_t(muldiv(-tb.tokens(), delay_t::period::den, tb.quota())); } +/// Evaluate throttling delay required based on the state of a token bucket +delay_t eval_node_delay( + const ssx::sharded_ptr& tbp) noexcept { + if (!tbp) { + return delay_t::zero(); + } + auto& tb = *tbp; + return std::chrono::duration_cast( + tb.duration_for(tb.deficiency(tb.grab(0)))); +} + } // namespace ingress_egress_state> @@ -303,9 +357,16 @@ snc_quota_manager::delays_t snc_quota_manager::get_shard_delays( // throttling delay the connection should be requested to throttle // this time - res.request = std::min( - _max_kafka_throttle_delay(), - std::max(eval_delay(_shard_quota.in), eval_delay(_shard_quota.eg))); + if (_use_throttling_v2()) { + res.request = std::min( + _max_kafka_throttle_delay(), + std::max( + eval_node_delay(_node_quota.in), eval_node_delay(_node_quota.eg))); + } else { + res.request = std::min( + _max_kafka_throttle_delay(), + std::max(eval_delay(_shard_quota.in), eval_delay(_shard_quota.eg))); + } ctx._throttled_until = now + res.request; _probe.record_throttle_time( @@ -321,7 +382,14 @@ void snc_quota_manager::record_request_receive( if (ctx._exempt) { return; } - _shard_quota.in.use(request_size, now); + if (_use_throttling_v2()) { + if (_node_quota.in) { + _node_quota.in->replenish(now); + _node_quota.in->grab(request_size); + } + } else { + _shard_quota.in.use(request_size, now); + } } void snc_quota_manager::record_request_intake( @@ -339,7 +407,14 @@ void snc_quota_manager::record_response( if (ctx._exempt) { return; } - _shard_quota.eg.use(request_size, now); + if (_use_throttling_v2()) { + if (_node_quota.eg) { + _node_quota.eg->replenish(now); + _node_quota.eg->grab(request_size); + } + } else { + _shard_quota.eg.use(request_size, now); + } _probe.rec_traffic_eg(request_size); } @@ -844,6 +919,10 @@ void snc_quota_manager::update_balance_config() { if (_balancer_gate.is_closed()) { return; } + if (_use_throttling_v2()) { + _balancer_timer.cancel(); + return; + } if (_balancer_timer.cancel()) { // cancel() returns true only on the balancer shard // because the timer is never armed on the others diff --git a/src/v/kafka/server/snc_quota_manager.h b/src/v/kafka/server/snc_quota_manager.h index c75e45c2c0d49..b88ae43d27c74 100644 --- a/src/v/kafka/server/snc_quota_manager.h +++ b/src/v/kafka/server/snc_quota_manager.h @@ -14,6 +14,7 @@ #include "config/throughput_control_group.h" #include "metrics/metrics.h" #include "seastarx.h" +#include "ssx/sharded_ptr.h" #include "utils/bottomless_token_bucket.h" #include "utils/log_hist.h" #include "utils/mutex.h" @@ -23,6 +24,7 @@ #include #include #include +#include #include #include @@ -105,8 +107,15 @@ class snc_quota_manager public: using clock = ss::lowres_clock; using quota_t = bottomless_token_bucket::quota_t; - - snc_quota_manager(); + using bucket_t = ss::internal::shared_token_bucket< + uint64_t, + std::ratio<1>, + ss::internal::capped_release::no, + clock>; + using buckets_t = kafka::ingress_egress_state< + ssx::sharded_ptr>; + + explicit snc_quota_manager(buckets_t& node_quota); snc_quota_manager(const snc_quota_manager&) = delete; snc_quota_manager& operator=(const snc_quota_manager&) = delete; snc_quota_manager(snc_quota_manager&&) = delete; @@ -216,6 +225,7 @@ class snc_quota_manager void update_balance_config(); // configuration config::binding _max_kafka_throttle_delay; + config::binding _use_throttling_v2; ingress_egress_state>> _kafka_throughput_limit_node_bps; config::binding _kafka_quota_balancer_window; @@ -237,6 +247,7 @@ class snc_quota_manager ingress_egress_state> _node_quota_default; ingress_egress_state _shard_quota_minimum; ingress_egress_state _shard_quota; + buckets_t& _node_quota; // service mutable snc_quotas_probe _probe; diff --git a/src/v/kafka/server/tests/produce_consume_test.cc b/src/v/kafka/server/tests/produce_consume_test.cc index d49e368d717ba..c9f5061b5da03 100644 --- a/src/v/kafka/server/tests/produce_consume_test.cc +++ b/src/v/kafka/server/tests/produce_consume_test.cc @@ -24,7 +24,13 @@ #include "test_utils/async.h" #include "test_utils/fixture.h" +#include +#include +#include +#include + #include +#include #include @@ -241,7 +247,102 @@ single_batch(model::partition_id p_id, const size_t volume) { namespace ch = std::chrono; +namespace { + +enum class execution { seq, par }; + +/// Runs func in a thread on the given shard +template +auto async_submit_to(unsigned int shard, Func&& func) { + return ss::smp::submit_to(shard, [&func, shard]() { + return ss::async(std::forward(func), shard); + }); +} + +/// std::transform_reduce, but with an api that matches ss::map_reduce +/// +/// Mapper must be synchronous +template +auto transform_reduce( + Range&& rng, Mapper&& mapper, Initial&& initial, Reduce&& reduce) { + return std::transform_reduce( + rng.begin(), + rng.end(), + std::forward(initial), + std::forward(reduce), + std::forward(mapper)); +} + +template +auto map_reduce_thread_per_core( + Mapper&& mapper, Initial&& initial, Reduce&& reduce) { + return ss::map_reduce( + boost::irange(0u, ss::smp::count), + [&mapper](auto shard) { + return async_submit_to(shard, std::forward(mapper)); + }, + std::forward(initial), + std::forward(reduce)); +} + +template +auto transform_reduce_thread_per_core( + Mapper&& mapper, Initial&& initial, Reduce&& reduce) { + return transform_reduce( + boost::irange(0u, ss::smp::count), + [&mapper](auto shard) { + return async_submit_to(shard, std::forward(mapper)).get(); + }, + std::forward(initial), + std::forward(reduce)); +} + +/// Run mapper in a thread on each core, and then reduce on the original core, +/// returning a synchrous result +/// +/// execution::par - Run mapper on each core in parallel +/// execution::par - Run mapper on each core sequentially +template +auto transform_reduce_thread_per_core( + execution policy, Mapper&& mapper, Initial&& initial, Reduce&& reduce) { + switch (policy) { + case execution::seq: + return transform_reduce_thread_per_core( + std::forward(mapper), + std::forward(initial), + std::forward(reduce)); + case execution::par: + return map_reduce_thread_per_core( + std::forward(mapper), + std::forward(initial), + std::forward(reduce)) + .get(); + } +} + +/// Return a tuple of the result of applying BinaryOp element-wise to each tuple +template +struct tuple_binary_op { + auto operator()(auto&& t1, auto&& t2) const { + return std::apply( + [&](auto&&... args1) { + return std::apply( + [&](auto&&... args2) { + return std::make_tuple(BinaryOp{}( + std::forward(args1), + std::forward(args2))...); + }, + std::forward(t2)); + }, + std::forward(t1)); + } +}; + +} // namespace + struct throughput_limits_fixure : prod_consume_fixture { + using honour_throttle = ss::bool_class; + static constexpr size_t kafka_packet_in_overhead = 127; static constexpr size_t kafka_packet_eg_overhead = 62; @@ -402,13 +503,265 @@ struct throughput_limits_fixure : prod_consume_fixture { << (stop - start - time_estimated) * 100.0 / time_estimated << "%"); return kafka_out_data_len; } + + auto do_produce( + unsigned i, + size_t batches_cnt, + size_t batch_size, + honour_throttle honour_throttle) { + const model::partition_id p_id{i}; + size_t data_len{0}; + size_t total_len{0}; + ch::milliseconds total_throttle_time{}; + ch::milliseconds throttle_time{}; + for (size_t k{0}; k != batches_cnt; ++k) { + if (honour_throttle && throttle_time > ch::milliseconds::zero()) { + ss::sleep(throttle_time).get(); + } + auto res + = produce_raw(producers[i], single_batch(p_id, batch_size)).get(); + throttle_time = res.data.throttle_time_ms; + total_throttle_time += throttle_time; + data_len += batch_size; + total_len += batch_size + kafka_packet_in_overhead; + } + return std::make_tuple(data_len, total_len, total_throttle_time); + } + + auto do_consume( + unsigned int i, size_t data_cap, honour_throttle honour_throttle) { + const model::partition_id p_id{i}; + size_t data_len{0}; + size_t total_len{0}; + ch::milliseconds total_throttle_time{}; + ch::milliseconds throttle_time{}; + while (data_len < data_cap) { + if (honour_throttle && throttle_time > ch::milliseconds::zero()) { + ss::sleep(throttle_time).get(); + } + const auto fetch_resp = fetch_next(consumers[i], p_id).get(); + BOOST_REQUIRE_EQUAL(fetch_resp.data.topics.size(), 1); + BOOST_REQUIRE_EQUAL(fetch_resp.data.topics[0].partitions.size(), 1); + BOOST_TEST_REQUIRE( + fetch_resp.data.topics[0].partitions[0].records.has_value()); + const auto kafka_data_len = fetch_resp.data.topics[0] + .partitions[0] + .records.value() + .size_bytes(); + throttle_time = fetch_resp.data.throttle_time_ms; + total_throttle_time += throttle_time; + data_len += kafka_data_len; + total_len += kafka_data_len + kafka_packet_eg_overhead; + } + return std::make_tuple(data_len, total_len, total_throttle_time); + } + + auto get_recorded_traffic() { + return app.snc_quota_mgr + .map_reduce0( + [](kafka::snc_quota_manager& snc) { + return std::make_tuple( + snc.get_snc_quotas_probe().get_traffic_in(), + snc.get_snc_quotas_probe().get_traffic_eg()); + }, + std::make_tuple(size_t{0}, size_t{0}), + tuple_binary_op>{}) + .get(); + }; + + auto get_throttle_time() { + return app.snc_quota_mgr + .map_reduce0( + [](kafka::snc_quota_manager& snc) { + return snc.get_snc_quotas_probe().get_throttle_time(); + }, + ss::metrics::histogram{}, + std::plus{}) + .get(); + }; + + void test_throughput(honour_throttle honour_throttle, execution policy) { + using clock = kafka::snc_quota_manager::clock; + // configure + constexpr int64_t rate_limit_in = 9_KiB; + constexpr int64_t rate_limit_out = 7_KiB; + constexpr size_t batch_size = 256; + constexpr auto max_rate = std::max(rate_limit_in, rate_limit_out); + constexpr auto min_rate = std::min(rate_limit_in, rate_limit_out); + const double expected_max_throttle + = (double(batch_size) / min_rate) + * (policy == execution::seq ? 1 : ss::smp::count) + * (honour_throttle ? 1 : 2); + const size_t tolerance_percent = 8; + config_set("kafka_throughput_throttling_v2", true); + config_set( + "kafka_throughput_limit_node_in_bps", + std::make_optional(rate_limit_in)); + config_set( + "kafka_throughput_limit_node_out_bps", + std::make_optional(rate_limit_out)); + config_set("fetch_max_bytes", batch_size); + config_set("max_kafka_throttle_delay_ms", 30'000ms); + + wait_for_controller_leadership().get(); + start(ss::smp::count); + + // PRODUCE smaller batches for 5s per client + const auto batches_cnt = (5s).count() * rate_limit_in + / (batch_size + kafka_packet_in_overhead); + + constexpr auto now = []() { + clock::update(); + return clock::now(); + }; + + BOOST_TEST_MESSAGE("Warming up"); + ss::sleep( + produce_raw( + producers[0], single_batch(model::partition_id{0}, max_rate)) + .get() + .data.throttle_time_ms) + .get(); + + auto [init_recorded_in, init_recorded_eg] = get_recorded_traffic(); + + BOOST_TEST_MESSAGE("Producing"); + auto start_in = now(); + auto [kafka_in_data_len, kafka_in_total_len, throttle_time_in] + = transform_reduce_thread_per_core( + policy, + [&](auto i) { + return do_produce(i, batches_cnt, batch_size, honour_throttle); + }, + std::make_tuple(size_t{0}, size_t{0}, ch::milliseconds{0}), + tuple_binary_op>{}); + auto duration_in = now() - start_in; + auto [produce_recorded_in, produce_recorded_eg] + = tuple_binary_op>{}( + get_recorded_traffic(), + std::make_tuple(init_recorded_in, init_recorded_eg)); + + BOOST_TEST_MESSAGE("Finished Producing, Warming up"); + + // Consume the warmup batch and wait for throttle time + ss::sleep(fetch_next(consumers[0], model::partition_id{0}) + .get() + .data.throttle_time_ms) + .get(); + std::tie(init_recorded_in, init_recorded_eg) = get_recorded_traffic(); + + BOOST_TEST_MESSAGE("Consuming"); + // CONSUME + auto start_eg = now(); + auto [kafka_eg_data_len, kafka_eg_total_len, throttle_time_out] + = transform_reduce_thread_per_core( + policy, + [&](auto i) { + const model::partition_id p_id{i}; + const auto data_cap = (kafka_in_data_len / ss::smp::count) + - batch_size * 2; + return do_consume(i, data_cap, honour_throttle); + }, + std::make_tuple(size_t{0}, size_t{0}, ch::milliseconds{0}), + tuple_binary_op>{}); + auto duration_eg = now() - start_eg; + BOOST_TEST_MESSAGE("Finished Consuming"); + auto [consume_recorded_in, consume_recorded_eg] + = tuple_binary_op>{}( + get_recorded_traffic(), + std::make_tuple(init_recorded_in, init_recorded_eg)); + auto throttle_hist = get_throttle_time(); + + // otherwise test is not valid: + BOOST_REQUIRE_GT(kafka_in_data_len, kafka_eg_data_len); + + BOOST_CHECK_GT(produce_recorded_in, produce_recorded_eg); + BOOST_CHECK_LT(consume_recorded_in, consume_recorded_eg); + + BOOST_CHECK_EQUAL(produce_recorded_in, kafka_in_total_len); + BOOST_CHECK_EQUAL(consume_recorded_eg, kafka_eg_total_len); + + const auto time_estimated_in = std::chrono::milliseconds{ + kafka_in_total_len * 1000 / rate_limit_in}; + + const auto time_estimated_eg = std::chrono::milliseconds{ + kafka_eg_total_len * 1000 / rate_limit_out}; + + BOOST_TEST_CHECK( + abs(duration_in - time_estimated_in) + < time_estimated_in * tolerance_percent / 100, + "Total ingress time: stop-start[" + << duration_in << "] ≈ time_estimated[" << time_estimated_in + << "] ±" << tolerance_percent + << "%, error: " << std::setprecision(3) + << (duration_in - time_estimated_in) * 100.0 / time_estimated_in + << "%"); + + BOOST_TEST_CHECK( + abs(duration_eg - time_estimated_eg) + < time_estimated_eg * tolerance_percent / 100, + "Total egress time: stop-start[" + << duration_eg << "] ≈ time_estimated[" << time_estimated_eg + << "] ±" << tolerance_percent + << "%, error: " << std::setprecision(3) + << (duration_eg - time_estimated_eg) * 100.0 / time_estimated_eg + << "%"); + + constexpr auto get_throttle_percentile = + [](ss::metrics::histogram const& h, int p) { + auto count_p = double(h.sample_count) * p / 100; + auto lb = std::ranges::lower_bound( + h.buckets, + count_p, + std::less{}, + &ss::metrics::histogram_bucket::count); + if (lb != h.buckets.begin()) { + auto prev = std::prev(lb); + double ratio = (count_p - prev->count) + / (lb->count - prev->count); + return prev->upper_bound + + ratio * (lb->upper_bound - prev->upper_bound); + } + return lb->upper_bound; + }; + + const auto expected_bucket = std::ranges::lower_bound( + throttle_hist.buckets, + expected_max_throttle * 2, // add some leeway + std::less{}, + &ss::metrics::histogram_bucket::upper_bound); + + auto throttle_90 = get_throttle_percentile(throttle_hist, 90); + BOOST_TEST_CHECK(throttle_90 <= expected_bucket->upper_bound); + } }; +FIXTURE_TEST( + test_node_throughput_v2_limits_no_throttle_seq, throughput_limits_fixure) { + test_throughput(honour_throttle::no, execution::seq); +} + +FIXTURE_TEST( + test_node_throughput_v2_limits_with_throttle_seq, throughput_limits_fixure) { + test_throughput(honour_throttle::yes, execution::seq); +} + +FIXTURE_TEST( + test_node_throughput_v2_limits_no_throttle_par, throughput_limits_fixure) { + test_throughput(honour_throttle::no, execution::par); +} + +FIXTURE_TEST( + test_node_throughput_v2_limits_with_throttle_par, throughput_limits_fixure) { + test_throughput(honour_throttle::yes, execution::par); +} + FIXTURE_TEST(test_node_throughput_limits_static, throughput_limits_fixure) { // configure constexpr int64_t pershard_rate_limit_in = 9_KiB; constexpr int64_t pershard_rate_limit_out = 7_KiB; constexpr size_t batch_size = 256; + config_set("kafka_throughput_throttling_v2", false); config_set( "kafka_throughput_limit_node_in_bps", std::make_optional(pershard_rate_limit_in * ss::smp::count)); @@ -442,6 +795,7 @@ FIXTURE_TEST(test_node_throughput_limits_balanced, throughput_limits_fixure) { constexpr int64_t rate_limit_in = 9_KiB; constexpr int64_t rate_limit_out = 7_KiB; constexpr size_t batch_size = 256; + config_set("kafka_throughput_throttling_v2", false); config_set( "kafka_throughput_limit_node_in_bps", std::make_optional(rate_limit_in)); config_set( diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index cc089f6c09483..a49ab7042f0e6 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -1586,7 +1586,7 @@ void application::wire_up_redpanda_services( // metrics and quota management syschecks::systemd_message("Adding kafka quota managers").get(); construct_service(quota_mgr).get(); - construct_service(snc_quota_mgr).get(); + construct_service(snc_quota_mgr, std::ref(snc_node_quota)).get(); syschecks::systemd_message("Creating auditing subsystem").get(); construct_service( diff --git a/src/v/redpanda/application.h b/src/v/redpanda/application.h index bc5f9307aaa0b..68d2824e3204d 100644 --- a/src/v/redpanda/application.h +++ b/src/v/redpanda/application.h @@ -29,6 +29,7 @@ #include "kafka/client/configuration.h" #include "kafka/client/fwd.h" #include "kafka/server/fwd.h" +#include "kafka/server/snc_quota_manager.h" #include "metrics/aggregate_metrics_watcher.h" #include "metrics/metrics.h" #include "net/conn_quota.h" @@ -152,6 +153,7 @@ class application { ss::sharded coordinator_ntp_mapper; ss::sharded group_router; ss::sharded quota_mgr; + kafka::snc_quota_manager::buckets_t snc_node_quota; ss::sharded snc_quota_mgr; ss::sharded rm_group_frontend; ss::sharded usage_manager;