Skip to content

Commit

Permalink
Merge pull request redpanda-data#16441 from BenPope/throughput_shared…
Browse files Browse the repository at this point in the history
…_token_bucket

Improve throughput throttling
  • Loading branch information
BenPope authored Feb 29, 2024
2 parents 74d9bb7 + 6ddfb32 commit bdd7e92
Show file tree
Hide file tree
Showing 10 changed files with 844 additions and 102 deletions.
15 changes: 15 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2779,6 +2779,21 @@ configuration::configuration()
{.needs_restart = needs_restart::no, .visibility = visibility::user},
std::nullopt,
{.min = 1})
, kafka_throughput_throttling_v2(
*this,
"kafka_throughput_throttling_v2",
"Use throughput throttling based on a shared token bucket instead of "
"balancing quota between shards",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
true)
, kafka_throughput_replenish_threshold(
*this,
"kafka_throughput_replenish_threshold",
"Threshold for refilling the token bucket. Will be clamped between 1 and "
"kafka_throughput_limit_node_*_bps.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
std::nullopt,
{.min = 1})
, kafka_quota_balancer_window(
*this,
"kafka_quota_balancer_window_ms",
Expand Down
3 changes: 3 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,9 @@ struct configuration final : public config_store {
bounded_property<std::optional<int64_t>> kafka_throughput_limit_node_in_bps;
bounded_property<std::optional<int64_t>>
kafka_throughput_limit_node_out_bps;
property<bool> kafka_throughput_throttling_v2;
bounded_property<std::optional<int64_t>>
kafka_throughput_replenish_threshold;
bounded_property<std::chrono::milliseconds> kafka_quota_balancer_window;
bounded_property<std::chrono::milliseconds>
kafka_quota_balancer_node_period;
Expand Down
156 changes: 127 additions & 29 deletions src/v/kafka/server/snc_quota_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
#include "config/configuration.h"
#include "kafka/server/logger.h"
#include "prometheus/prometheus_sanitize.h"
#include "ssx/future-util.h"
#include "ssx/sharded_ptr.h"
#include "tristate.h"

#include <seastar/core/metrics.hh>

#include <fmt/core.h>
#include <fmt/ranges.h>

#include <chrono>
#include <iterator>
#include <memory>
#include <numeric>
Expand Down Expand Up @@ -94,10 +97,21 @@ void snc_quotas_probe::setup_metrics() {
}
metric_defs.emplace_back(sm::make_counter(
"traffic_intake",
_traffic_in,
_traffic.in,
sm::description("Amount of Kafka traffic received from the clients "
"that is taken into processing, in bytes")));

metric_defs.emplace_back(sm::make_counter(
"traffic_egress",
_traffic.eg,
sm::description("Amount of Kafka traffic published to the clients "
"that was taken into processing, in bytes")));

metric_defs.emplace_back(sm::make_histogram(
"throttle_time",
[this] { return get_throttle_time(); },
sm::description("Throttle time histogram (in seconds)")));

_metrics.add_group(
prometheus_sanitize::metrics_name("kafka:quotas"),
metric_defs,
Expand Down Expand Up @@ -126,11 +140,29 @@ quota_t node_to_shard_quota(const std::optional<quota_t> node_quota) {
}
}

auto update_node_bucket(
ssx::sharded_ptr<kafka::snc_quota_manager::bucket_t>& b,
config::binding<std::optional<int64_t>> const& cfg) {
if (!cfg().has_value()) {
return b.reset();
}
uint64_t rate = *cfg();
if (b && b->rate() == rate) {
return ss::make_ready_future();
}
uint64_t limit = rate;
uint64_t threshold = config::shard_local_cfg()
.kafka_throughput_replenish_threshold()
.value_or(1);
return b.reset(rate, limit, threshold, false);
};

} // namespace

snc_quota_manager::snc_quota_manager()
snc_quota_manager::snc_quota_manager(buckets_t& node_quota)
: _max_kafka_throttle_delay(
config::shard_local_cfg().max_kafka_throttle_delay_ms.bind())
, _use_throttling_v2(config::shard_local_cfg().kafka_throughput_throttling_v2)
, _kafka_throughput_limit_node_bps{
config::shard_local_cfg().kafka_throughput_limit_node_in_bps.bind(),
config::shard_local_cfg().kafka_throughput_limit_node_out_bps.bind()}
Expand All @@ -151,33 +183,37 @@ snc_quota_manager::snc_quota_manager()
_kafka_quota_balancer_window()},
.eg {node_to_shard_quota(_node_quota_default.eg),
_kafka_quota_balancer_window()}}
, _node_quota{node_quota}
, _probe(*this)
{
update_shard_quota_minimum();
_kafka_throughput_limit_node_bps.in.watch(
[this] { update_node_quota_default(); });
_kafka_throughput_limit_node_bps.eg.watch(
[this] { update_node_quota_default(); });
_use_throttling_v2.watch([this]() { update_balance_config(); });
_kafka_throughput_limit_node_bps.in.watch([this] {
update_node_quota_default();
if (ss::this_shard_id() == quota_balancer_shard) {
ssx::spawn_with_gate(_balancer_gate, [this] {
return update_node_bucket(
_node_quota.in, _kafka_throughput_limit_node_bps.in);
});
}
});
_kafka_throughput_limit_node_bps.eg.watch([this] {
update_node_quota_default();
if (ss::this_shard_id() == quota_balancer_shard) {
ssx::spawn_with_gate(_balancer_gate, [this] {
return update_node_bucket(
_node_quota.eg, _kafka_throughput_limit_node_bps.eg);
});
}
});
_kafka_quota_balancer_window.watch([this] {
const auto v = _kafka_quota_balancer_window();
vlog(klog.debug, "qm - Set shard TP token bucket window: {}", v);
_shard_quota.in.set_width(v);
_shard_quota.eg.set_width(v);
});
_kafka_quota_balancer_node_period.watch([this] {
if (_balancer_gate.is_closed()) {
return;
}
if (_balancer_timer.cancel()) {
// cancel() returns true only on the balancer shard
// because the timer is never armed on the others
arm_balancer_timer();
}
// if the balancer is disabled, this is where the quotas are reset to
// default. This needs to be called on every shard because the effective
// balance is updated directly in this case.
update_node_quota_default();
});
_kafka_quota_balancer_node_period.watch(
[this]() { update_balance_config(); });
_kafka_quota_balancer_min_shard_throughput_ratio.watch(
[this] { update_shard_quota_minimum(); });
_kafka_quota_balancer_min_shard_throughput_bps.watch(
Expand All @@ -199,16 +235,23 @@ ss::future<> snc_quota_manager::start() {
if (ss::this_shard_id() == quota_balancer_shard) {
_balancer_timer.arm(
ss::lowres_clock::now() + get_quota_balancer_node_period());
co_await update_node_bucket(
_node_quota.in, _kafka_throughput_limit_node_bps.in);
co_await update_node_bucket(
_node_quota.eg, _kafka_throughput_limit_node_bps.eg);
}
return ss::make_ready_future<>();
}

ss::future<> snc_quota_manager::stop() {
if (ss::this_shard_id() == quota_balancer_shard) {
_balancer_timer.cancel();
return _balancer_gate.close();
} else {
return ss::make_ready_future<>();
co_await _balancer_gate.close();
if (_node_quota.in) {
co_await _node_quota.in.stop();
}
if (_node_quota.eg) {
co_await _node_quota.eg.stop();
}
}
}

Expand All @@ -229,6 +272,17 @@ delay_t eval_delay(const bottomless_token_bucket& tb) noexcept {
return delay_t(muldiv(-tb.tokens(), delay_t::period::den, tb.quota()));
}

/// Evaluate throttling delay required based on the state of a token bucket
delay_t eval_node_delay(
const ssx::sharded_ptr<snc_quota_manager::bucket_t>& tbp) noexcept {
if (!tbp) {
return delay_t::zero();
}
auto& tb = *tbp;
return std::chrono::duration_cast<delay_t>(
tb.duration_for(tb.deficiency(tb.grab(0))));
}

} // namespace

ingress_egress_state<std::optional<quota_t>>
Expand Down Expand Up @@ -303,11 +357,21 @@ snc_quota_manager::delays_t snc_quota_manager::get_shard_delays(

// throttling delay the connection should be requested to throttle
// this time
res.request = std::min(
_max_kafka_throttle_delay(),
std::max(eval_delay(_shard_quota.in), eval_delay(_shard_quota.eg)));
if (_use_throttling_v2()) {
res.request = std::min(
_max_kafka_throttle_delay(),
std::max(
eval_node_delay(_node_quota.in), eval_node_delay(_node_quota.eg)));
} else {
res.request = std::min(
_max_kafka_throttle_delay(),
std::max(eval_delay(_shard_quota.in), eval_delay(_shard_quota.eg)));
}
ctx._throttled_until = now + res.request;

_probe.record_throttle_time(
std::chrono::duration_cast<std::chrono::microseconds>(res.request));

return res;
}

Expand All @@ -318,7 +382,14 @@ void snc_quota_manager::record_request_receive(
if (ctx._exempt) {
return;
}
_shard_quota.in.use(request_size, now);
if (_use_throttling_v2()) {
if (_node_quota.in) {
_node_quota.in->replenish(now);
_node_quota.in->grab(request_size);
}
} else {
_shard_quota.in.use(request_size, now);
}
}

void snc_quota_manager::record_request_intake(
Expand All @@ -336,7 +407,15 @@ void snc_quota_manager::record_response(
if (ctx._exempt) {
return;
}
_shard_quota.eg.use(request_size, now);
if (_use_throttling_v2()) {
if (_node_quota.eg) {
_node_quota.eg->replenish(now);
_node_quota.eg->grab(request_size);
}
} else {
_shard_quota.eg.use(request_size, now);
}
_probe.rec_traffic_eg(request_size);
}

ss::lowres_clock::duration
Expand Down Expand Up @@ -836,6 +915,25 @@ void snc_quota_manager::adjust_quota(
vlog(klog.trace, "qm - Adjust quota: {} -> {}", delta, _shard_quota);
}

void snc_quota_manager::update_balance_config() {
if (_balancer_gate.is_closed()) {
return;
}
if (_use_throttling_v2()) {
_balancer_timer.cancel();
return;
}
if (_balancer_timer.cancel()) {
// cancel() returns true only on the balancer shard
// because the timer is never armed on the others
arm_balancer_timer();
}
// if the balancer is disabled, this is where the quotas are reset to
// default. This needs to be called on every shard because the effective
// balance is updated directly in this case.
update_node_quota_default();
}

} // namespace kafka

struct parseless_formatter {
Expand Down
36 changes: 31 additions & 5 deletions src/v/kafka/server/snc_quota_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@
#include "config/property.h"
#include "config/throughput_control_group.h"
#include "metrics/metrics.h"
#include "ssx/sharded_ptr.h"
#include "utils/bottomless_token_bucket.h"
#include "utils/log_hist.h"
#include "utils/mutex.h"

#include <seastar/core/future.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/timer.hh>
#include <seastar/util/shared_token_bucket.hh>

#include <chrono>
#include <optional>
Expand All @@ -48,17 +51,30 @@ class snc_quotas_probe {
~snc_quotas_probe() noexcept = default;

void rec_balancer_step() noexcept { ++_balancer_runs; }
void rec_traffic_in(const size_t bytes) noexcept { _traffic_in += bytes; }
void rec_traffic_in(const size_t bytes) noexcept { _traffic.in += bytes; }
void rec_traffic_eg(const size_t bytes) noexcept { _traffic.eg += bytes; }

void setup_metrics();

uint64_t get_balancer_runs() const noexcept { return _balancer_runs; }

auto get_traffic_in() const { return _traffic.in; }
auto get_traffic_eg() const { return _traffic.eg; }

auto record_throttle_time(std::chrono::microseconds t) {
return _throttle_time_us.record(t.count());
}

auto get_throttle_time() const {
return _throttle_time_us.public_histogram_logform();
}

private:
class snc_quota_manager& _qm;
metrics::internal_metric_groups _metrics;
uint64_t _balancer_runs = 0;
size_t _traffic_in = 0;
ingress_egress_state<size_t> _traffic = {};
log_hist_public _throttle_time_us;
};

class snc_quota_context {
Expand Down Expand Up @@ -91,8 +107,15 @@ class snc_quota_manager
public:
using clock = ss::lowres_clock;
using quota_t = bottomless_token_bucket::quota_t;

snc_quota_manager();
using bucket_t = ss::internal::shared_token_bucket<
uint64_t,
std::ratio<1>,
ss::internal::capped_release::no,
clock>;
using buckets_t = kafka::ingress_egress_state<
ssx::sharded_ptr<kafka::snc_quota_manager::bucket_t>>;

explicit snc_quota_manager(buckets_t& node_quota);
snc_quota_manager(const snc_quota_manager&) = delete;
snc_quota_manager& operator=(const snc_quota_manager&) = delete;
snc_quota_manager(snc_quota_manager&&) = delete;
Expand Down Expand Up @@ -199,8 +222,10 @@ class snc_quota_manager
void adjust_quota(const ingress_egress_state<quota_t>& delta) noexcept;

private:
void update_balance_config();
// configuration
config::binding<std::chrono::milliseconds> _max_kafka_throttle_delay;
config::binding<bool> _use_throttling_v2;
ingress_egress_state<config::binding<std::optional<quota_t>>>
_kafka_throughput_limit_node_bps;
config::binding<std::chrono::milliseconds> _kafka_quota_balancer_window;
Expand All @@ -222,9 +247,10 @@ class snc_quota_manager
ingress_egress_state<std::optional<quota_t>> _node_quota_default;
ingress_egress_state<quota_t> _shard_quota_minimum;
ingress_egress_state<bottomless_token_bucket> _shard_quota;
buckets_t& _node_quota;

// service
snc_quotas_probe _probe;
mutable snc_quotas_probe _probe;
};

// Names exposed in this namespace are for unit test integration only
Expand Down
Loading

0 comments on commit bdd7e92

Please sign in to comment.