Skip to content

Commit

Permalink
kafka/server: Introduce throughput throttling v2
Browse files Browse the repository at this point in the history
When `kafka_throughput_throttling_v2` is enabled (true by default),
use an `ss::internal::shared_token_bucket` for requests across all
shards, instead of balancing quota between shards.

Signed-off-by: Ben Pope <ben@redpanda.com>
(cherry picked from commit 6ddfb32)

Conflicts:
  src/v/kafka/server/snc_quota_manager.h (header include)
  • Loading branch information
BenPope committed Mar 4, 2024
1 parent 6b6b3b8 commit 0edfbe5
Show file tree
Hide file tree
Showing 7 changed files with 481 additions and 17 deletions.
15 changes: 15 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2777,6 +2777,21 @@ configuration::configuration()
{.needs_restart = needs_restart::no, .visibility = visibility::user},
std::nullopt,
{.min = 1})
, kafka_throughput_throttling_v2(
*this,
"kafka_throughput_throttling_v2",
"Use throughput throttling based on a shared token bucket instead of "
"balancing quota between shards",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
true)
, kafka_throughput_replenish_threshold(
*this,
"kafka_throughput_replenish_threshold",
"Threshold for refilling the token bucket. Will be clamped between 1 and "
"kafka_throughput_limit_node_*_bps.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
std::nullopt,
{.min = 1})
, kafka_quota_balancer_window(
*this,
"kafka_quota_balancer_window_ms",
Expand Down
3 changes: 3 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,9 @@ struct configuration final : public config_store {
bounded_property<std::optional<int64_t>> kafka_throughput_limit_node_in_bps;
bounded_property<std::optional<int64_t>>
kafka_throughput_limit_node_out_bps;
property<bool> kafka_throughput_throttling_v2;
bounded_property<std::optional<int64_t>>
kafka_throughput_replenish_threshold;
bounded_property<std::chrono::milliseconds> kafka_quota_balancer_window;
bounded_property<std::chrono::milliseconds>
kafka_quota_balancer_node_period;
Expand Down
107 changes: 93 additions & 14 deletions src/v/kafka/server/snc_quota_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "config/configuration.h"
#include "kafka/server/logger.h"
#include "prometheus/prometheus_sanitize.h"
#include "ssx/future-util.h"
#include "ssx/sharded_ptr.h"
#include "tristate.h"

#include <seastar/core/metrics.hh>
Expand Down Expand Up @@ -138,11 +140,29 @@ quota_t node_to_shard_quota(const std::optional<quota_t> node_quota) {
}
}

auto update_node_bucket(
ssx::sharded_ptr<kafka::snc_quota_manager::bucket_t>& b,
config::binding<std::optional<int64_t>> const& cfg) {
if (!cfg().has_value()) {
return b.reset();
}
uint64_t rate = *cfg();
if (b && b->rate() == rate) {
return ss::make_ready_future();
}
uint64_t limit = rate;
uint64_t threshold = config::shard_local_cfg()
.kafka_throughput_replenish_threshold()
.value_or(1);
return b.reset(rate, limit, threshold, false);
};

} // namespace

snc_quota_manager::snc_quota_manager()
snc_quota_manager::snc_quota_manager(buckets_t& node_quota)
: _max_kafka_throttle_delay(
config::shard_local_cfg().max_kafka_throttle_delay_ms.bind())
, _use_throttling_v2(config::shard_local_cfg().kafka_throughput_throttling_v2)
, _kafka_throughput_limit_node_bps{
config::shard_local_cfg().kafka_throughput_limit_node_in_bps.bind(),
config::shard_local_cfg().kafka_throughput_limit_node_out_bps.bind()}
Expand All @@ -163,13 +183,29 @@ snc_quota_manager::snc_quota_manager()
_kafka_quota_balancer_window()},
.eg {node_to_shard_quota(_node_quota_default.eg),
_kafka_quota_balancer_window()}}
, _node_quota{node_quota}
, _probe(*this)
{
update_shard_quota_minimum();
_kafka_throughput_limit_node_bps.in.watch(
[this] { update_node_quota_default(); });
_kafka_throughput_limit_node_bps.eg.watch(
[this] { update_node_quota_default(); });
_use_throttling_v2.watch([this]() { update_balance_config(); });
_kafka_throughput_limit_node_bps.in.watch([this] {
update_node_quota_default();
if (ss::this_shard_id() == quota_balancer_shard) {
ssx::spawn_with_gate(_balancer_gate, [this] {
return update_node_bucket(
_node_quota.in, _kafka_throughput_limit_node_bps.in);
});
}
});
_kafka_throughput_limit_node_bps.eg.watch([this] {
update_node_quota_default();
if (ss::this_shard_id() == quota_balancer_shard) {
ssx::spawn_with_gate(_balancer_gate, [this] {
return update_node_bucket(
_node_quota.eg, _kafka_throughput_limit_node_bps.eg);
});
}
});
_kafka_quota_balancer_window.watch([this] {
const auto v = _kafka_quota_balancer_window();
vlog(klog.debug, "qm - Set shard TP token bucket window: {}", v);
Expand Down Expand Up @@ -199,16 +235,23 @@ ss::future<> snc_quota_manager::start() {
if (ss::this_shard_id() == quota_balancer_shard) {
_balancer_timer.arm(
ss::lowres_clock::now() + get_quota_balancer_node_period());
co_await update_node_bucket(
_node_quota.in, _kafka_throughput_limit_node_bps.in);
co_await update_node_bucket(
_node_quota.eg, _kafka_throughput_limit_node_bps.eg);
}
return ss::make_ready_future<>();
}

ss::future<> snc_quota_manager::stop() {
if (ss::this_shard_id() == quota_balancer_shard) {
_balancer_timer.cancel();
return _balancer_gate.close();
} else {
return ss::make_ready_future<>();
co_await _balancer_gate.close();
if (_node_quota.in) {
co_await _node_quota.in.stop();
}
if (_node_quota.eg) {
co_await _node_quota.eg.stop();
}
}
}

Expand All @@ -229,6 +272,17 @@ delay_t eval_delay(const bottomless_token_bucket& tb) noexcept {
return delay_t(muldiv(-tb.tokens(), delay_t::period::den, tb.quota()));
}

/// Evaluate throttling delay required based on the state of a token bucket
delay_t eval_node_delay(
const ssx::sharded_ptr<snc_quota_manager::bucket_t>& tbp) noexcept {
if (!tbp) {
return delay_t::zero();
}
auto& tb = *tbp;
return std::chrono::duration_cast<delay_t>(
tb.duration_for(tb.deficiency(tb.grab(0))));
}

} // namespace

ingress_egress_state<std::optional<quota_t>>
Expand Down Expand Up @@ -303,9 +357,16 @@ snc_quota_manager::delays_t snc_quota_manager::get_shard_delays(

// throttling delay the connection should be requested to throttle
// this time
res.request = std::min(
_max_kafka_throttle_delay(),
std::max(eval_delay(_shard_quota.in), eval_delay(_shard_quota.eg)));
if (_use_throttling_v2()) {
res.request = std::min(
_max_kafka_throttle_delay(),
std::max(
eval_node_delay(_node_quota.in), eval_node_delay(_node_quota.eg)));
} else {
res.request = std::min(
_max_kafka_throttle_delay(),
std::max(eval_delay(_shard_quota.in), eval_delay(_shard_quota.eg)));
}
ctx._throttled_until = now + res.request;

_probe.record_throttle_time(
Expand All @@ -321,7 +382,14 @@ void snc_quota_manager::record_request_receive(
if (ctx._exempt) {
return;
}
_shard_quota.in.use(request_size, now);
if (_use_throttling_v2()) {
if (_node_quota.in) {
_node_quota.in->replenish(now);
_node_quota.in->grab(request_size);
}
} else {
_shard_quota.in.use(request_size, now);
}
}

void snc_quota_manager::record_request_intake(
Expand All @@ -339,7 +407,14 @@ void snc_quota_manager::record_response(
if (ctx._exempt) {
return;
}
_shard_quota.eg.use(request_size, now);
if (_use_throttling_v2()) {
if (_node_quota.eg) {
_node_quota.eg->replenish(now);
_node_quota.eg->grab(request_size);
}
} else {
_shard_quota.eg.use(request_size, now);
}
_probe.rec_traffic_eg(request_size);
}

Expand Down Expand Up @@ -844,6 +919,10 @@ void snc_quota_manager::update_balance_config() {
if (_balancer_gate.is_closed()) {
return;
}
if (_use_throttling_v2()) {
_balancer_timer.cancel();
return;
}
if (_balancer_timer.cancel()) {
// cancel() returns true only on the balancer shard
// because the timer is never armed on the others
Expand Down
15 changes: 13 additions & 2 deletions src/v/kafka/server/snc_quota_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "config/throughput_control_group.h"
#include "metrics/metrics.h"
#include "seastarx.h"
#include "ssx/sharded_ptr.h"
#include "utils/bottomless_token_bucket.h"
#include "utils/log_hist.h"
#include "utils/mutex.h"
Expand All @@ -23,6 +24,7 @@
#include <seastar/core/sharded.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/timer.hh>
#include <seastar/util/shared_token_bucket.hh>

#include <chrono>
#include <optional>
Expand Down Expand Up @@ -105,8 +107,15 @@ class snc_quota_manager
public:
using clock = ss::lowres_clock;
using quota_t = bottomless_token_bucket::quota_t;

snc_quota_manager();
using bucket_t = ss::internal::shared_token_bucket<
uint64_t,
std::ratio<1>,
ss::internal::capped_release::no,
clock>;
using buckets_t = kafka::ingress_egress_state<
ssx::sharded_ptr<kafka::snc_quota_manager::bucket_t>>;

explicit snc_quota_manager(buckets_t& node_quota);
snc_quota_manager(const snc_quota_manager&) = delete;
snc_quota_manager& operator=(const snc_quota_manager&) = delete;
snc_quota_manager(snc_quota_manager&&) = delete;
Expand Down Expand Up @@ -216,6 +225,7 @@ class snc_quota_manager
void update_balance_config();
// configuration
config::binding<std::chrono::milliseconds> _max_kafka_throttle_delay;
config::binding<bool> _use_throttling_v2;
ingress_egress_state<config::binding<std::optional<quota_t>>>
_kafka_throughput_limit_node_bps;
config::binding<std::chrono::milliseconds> _kafka_quota_balancer_window;
Expand All @@ -237,6 +247,7 @@ class snc_quota_manager
ingress_egress_state<std::optional<quota_t>> _node_quota_default;
ingress_egress_state<quota_t> _shard_quota_minimum;
ingress_egress_state<bottomless_token_bucket> _shard_quota;
buckets_t& _node_quota;

// service
mutable snc_quotas_probe _probe;
Expand Down
Loading

0 comments on commit 0edfbe5

Please sign in to comment.