Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.3.x] Improve throughput throttling #16848

Merged
merged 5 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2777,6 +2777,21 @@ configuration::configuration()
{.needs_restart = needs_restart::no, .visibility = visibility::user},
std::nullopt,
{.min = 1})
, kafka_throughput_throttling_v2(
*this,
"kafka_throughput_throttling_v2",
"Use throughput throttling based on a shared token bucket instead of "
"balancing quota between shards",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
true)
, kafka_throughput_replenish_threshold(
*this,
"kafka_throughput_replenish_threshold",
"Threshold for refilling the token bucket. Will be clamped between 1 and "
"kafka_throughput_limit_node_*_bps.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
std::nullopt,
{.min = 1})
, kafka_quota_balancer_window(
*this,
"kafka_quota_balancer_window_ms",
Expand Down
3 changes: 3 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,9 @@ struct configuration final : public config_store {
bounded_property<std::optional<int64_t>> kafka_throughput_limit_node_in_bps;
bounded_property<std::optional<int64_t>>
kafka_throughput_limit_node_out_bps;
property<bool> kafka_throughput_throttling_v2;
bounded_property<std::optional<int64_t>>
kafka_throughput_replenish_threshold;
bounded_property<std::chrono::milliseconds> kafka_quota_balancer_window;
bounded_property<std::chrono::milliseconds>
kafka_quota_balancer_node_period;
Expand Down
156 changes: 127 additions & 29 deletions src/v/kafka/server/snc_quota_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
#include "config/configuration.h"
#include "kafka/server/logger.h"
#include "prometheus/prometheus_sanitize.h"
#include "ssx/future-util.h"
#include "ssx/sharded_ptr.h"
#include "tristate.h"

#include <seastar/core/metrics.hh>

#include <fmt/core.h>
#include <fmt/ranges.h>

#include <chrono>
#include <iterator>
#include <memory>
#include <numeric>
Expand Down Expand Up @@ -94,10 +97,21 @@ void snc_quotas_probe::setup_metrics() {
}
metric_defs.emplace_back(sm::make_counter(
"traffic_intake",
_traffic_in,
_traffic.in,
sm::description("Amount of Kafka traffic received from the clients "
"that is taken into processing, in bytes")));

metric_defs.emplace_back(sm::make_counter(
"traffic_egress",
_traffic.eg,
sm::description("Amount of Kafka traffic published to the clients "
"that was taken into processing, in bytes")));

metric_defs.emplace_back(sm::make_histogram(
"throttle_time",
[this] { return get_throttle_time(); },
sm::description("Throttle time histogram (in seconds)")));

_metrics.add_group(
prometheus_sanitize::metrics_name("kafka:quotas"),
metric_defs,
Expand Down Expand Up @@ -126,11 +140,29 @@ quota_t node_to_shard_quota(const std::optional<quota_t> node_quota) {
}
}

auto update_node_bucket(
ssx::sharded_ptr<kafka::snc_quota_manager::bucket_t>& b,
config::binding<std::optional<int64_t>> const& cfg) {
if (!cfg().has_value()) {
return b.reset();
}
uint64_t rate = *cfg();
if (b && b->rate() == rate) {
return ss::make_ready_future();
}
uint64_t limit = rate;
uint64_t threshold = config::shard_local_cfg()
.kafka_throughput_replenish_threshold()
.value_or(1);
return b.reset(rate, limit, threshold, false);
};

} // namespace

snc_quota_manager::snc_quota_manager()
snc_quota_manager::snc_quota_manager(buckets_t& node_quota)
: _max_kafka_throttle_delay(
config::shard_local_cfg().max_kafka_throttle_delay_ms.bind())
, _use_throttling_v2(config::shard_local_cfg().kafka_throughput_throttling_v2)
, _kafka_throughput_limit_node_bps{
config::shard_local_cfg().kafka_throughput_limit_node_in_bps.bind(),
config::shard_local_cfg().kafka_throughput_limit_node_out_bps.bind()}
Expand All @@ -151,33 +183,37 @@ snc_quota_manager::snc_quota_manager()
_kafka_quota_balancer_window()},
.eg {node_to_shard_quota(_node_quota_default.eg),
_kafka_quota_balancer_window()}}
, _node_quota{node_quota}
, _probe(*this)
{
update_shard_quota_minimum();
_kafka_throughput_limit_node_bps.in.watch(
[this] { update_node_quota_default(); });
_kafka_throughput_limit_node_bps.eg.watch(
[this] { update_node_quota_default(); });
_use_throttling_v2.watch([this]() { update_balance_config(); });
_kafka_throughput_limit_node_bps.in.watch([this] {
update_node_quota_default();
if (ss::this_shard_id() == quota_balancer_shard) {
ssx::spawn_with_gate(_balancer_gate, [this] {
return update_node_bucket(
_node_quota.in, _kafka_throughput_limit_node_bps.in);
});
}
});
_kafka_throughput_limit_node_bps.eg.watch([this] {
update_node_quota_default();
if (ss::this_shard_id() == quota_balancer_shard) {
ssx::spawn_with_gate(_balancer_gate, [this] {
return update_node_bucket(
_node_quota.eg, _kafka_throughput_limit_node_bps.eg);
});
}
});
_kafka_quota_balancer_window.watch([this] {
const auto v = _kafka_quota_balancer_window();
vlog(klog.debug, "qm - Set shard TP token bucket window: {}", v);
_shard_quota.in.set_width(v);
_shard_quota.eg.set_width(v);
});
_kafka_quota_balancer_node_period.watch([this] {
if (_balancer_gate.is_closed()) {
return;
}
if (_balancer_timer.cancel()) {
// cancel() returns true only on the balancer shard
// because the timer is never armed on the others
arm_balancer_timer();
}
// if the balancer is disabled, this is where the quotas are reset to
// default. This needs to be called on every shard because the effective
// balance is updated directly in this case.
update_node_quota_default();
});
_kafka_quota_balancer_node_period.watch(
[this]() { update_balance_config(); });
_kafka_quota_balancer_min_shard_throughput_ratio.watch(
[this] { update_shard_quota_minimum(); });
_kafka_quota_balancer_min_shard_throughput_bps.watch(
Expand All @@ -199,16 +235,23 @@ ss::future<> snc_quota_manager::start() {
if (ss::this_shard_id() == quota_balancer_shard) {
_balancer_timer.arm(
ss::lowres_clock::now() + get_quota_balancer_node_period());
co_await update_node_bucket(
_node_quota.in, _kafka_throughput_limit_node_bps.in);
co_await update_node_bucket(
_node_quota.eg, _kafka_throughput_limit_node_bps.eg);
}
return ss::make_ready_future<>();
}

ss::future<> snc_quota_manager::stop() {
if (ss::this_shard_id() == quota_balancer_shard) {
_balancer_timer.cancel();
return _balancer_gate.close();
} else {
return ss::make_ready_future<>();
co_await _balancer_gate.close();
if (_node_quota.in) {
co_await _node_quota.in.stop();
}
if (_node_quota.eg) {
co_await _node_quota.eg.stop();
}
}
}

Expand All @@ -229,6 +272,17 @@ delay_t eval_delay(const bottomless_token_bucket& tb) noexcept {
return delay_t(muldiv(-tb.tokens(), delay_t::period::den, tb.quota()));
}

/// Evaluate throttling delay required based on the state of a token bucket
delay_t eval_node_delay(
const ssx::sharded_ptr<snc_quota_manager::bucket_t>& tbp) noexcept {
if (!tbp) {
return delay_t::zero();
}
auto& tb = *tbp;
return std::chrono::duration_cast<delay_t>(
tb.duration_for(tb.deficiency(tb.grab(0))));
}

} // namespace

ingress_egress_state<std::optional<quota_t>>
Expand Down Expand Up @@ -303,11 +357,21 @@ snc_quota_manager::delays_t snc_quota_manager::get_shard_delays(

// throttling delay the connection should be requested to throttle
// this time
res.request = std::min(
_max_kafka_throttle_delay(),
std::max(eval_delay(_shard_quota.in), eval_delay(_shard_quota.eg)));
if (_use_throttling_v2()) {
res.request = std::min(
_max_kafka_throttle_delay(),
std::max(
eval_node_delay(_node_quota.in), eval_node_delay(_node_quota.eg)));
} else {
res.request = std::min(
_max_kafka_throttle_delay(),
std::max(eval_delay(_shard_quota.in), eval_delay(_shard_quota.eg)));
}
ctx._throttled_until = now + res.request;

_probe.record_throttle_time(
std::chrono::duration_cast<std::chrono::microseconds>(res.request));

return res;
}

Expand All @@ -318,7 +382,14 @@ void snc_quota_manager::record_request_receive(
if (ctx._exempt) {
return;
}
_shard_quota.in.use(request_size, now);
if (_use_throttling_v2()) {
if (_node_quota.in) {
_node_quota.in->replenish(now);
_node_quota.in->grab(request_size);
}
} else {
_shard_quota.in.use(request_size, now);
}
}

void snc_quota_manager::record_request_intake(
Expand All @@ -336,7 +407,15 @@ void snc_quota_manager::record_response(
if (ctx._exempt) {
return;
}
_shard_quota.eg.use(request_size, now);
if (_use_throttling_v2()) {
if (_node_quota.eg) {
_node_quota.eg->replenish(now);
_node_quota.eg->grab(request_size);
}
} else {
_shard_quota.eg.use(request_size, now);
}
_probe.rec_traffic_eg(request_size);
}

ss::lowres_clock::duration
Expand Down Expand Up @@ -836,6 +915,25 @@ void snc_quota_manager::adjust_quota(
vlog(klog.trace, "qm - Adjust quota: {} -> {}", delta, _shard_quota);
}

void snc_quota_manager::update_balance_config() {
if (_balancer_gate.is_closed()) {
return;
}
if (_use_throttling_v2()) {
_balancer_timer.cancel();
return;
}
if (_balancer_timer.cancel()) {
// cancel() returns true only on the balancer shard
// because the timer is never armed on the others
arm_balancer_timer();
}
// if the balancer is disabled, this is where the quotas are reset to
// default. This needs to be called on every shard because the effective
// balance is updated directly in this case.
update_node_quota_default();
}

} // namespace kafka

struct parseless_formatter {
Expand Down
36 changes: 31 additions & 5 deletions src/v/kafka/server/snc_quota_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@
#include "config/throughput_control_group.h"
#include "metrics/metrics.h"
#include "seastarx.h"
#include "ssx/sharded_ptr.h"
#include "utils/bottomless_token_bucket.h"
#include "utils/log_hist.h"
#include "utils/mutex.h"

#include <seastar/core/future.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/timer.hh>
#include <seastar/util/shared_token_bucket.hh>

#include <chrono>
#include <optional>
Expand All @@ -48,17 +51,30 @@ class snc_quotas_probe {
~snc_quotas_probe() noexcept = default;

void rec_balancer_step() noexcept { ++_balancer_runs; }
void rec_traffic_in(const size_t bytes) noexcept { _traffic_in += bytes; }
void rec_traffic_in(const size_t bytes) noexcept { _traffic.in += bytes; }
void rec_traffic_eg(const size_t bytes) noexcept { _traffic.eg += bytes; }

void setup_metrics();

uint64_t get_balancer_runs() const noexcept { return _balancer_runs; }

auto get_traffic_in() const { return _traffic.in; }
auto get_traffic_eg() const { return _traffic.eg; }

auto record_throttle_time(std::chrono::microseconds t) {
return _throttle_time_us.record(t.count());
}

auto get_throttle_time() const {
return _throttle_time_us.public_histogram_logform();
}

private:
class snc_quota_manager& _qm;
metrics::internal_metric_groups _metrics;
uint64_t _balancer_runs = 0;
size_t _traffic_in = 0;
ingress_egress_state<size_t> _traffic = {};
log_hist_public _throttle_time_us;
};

class snc_quota_context {
Expand Down Expand Up @@ -91,8 +107,15 @@ class snc_quota_manager
public:
using clock = ss::lowres_clock;
using quota_t = bottomless_token_bucket::quota_t;

snc_quota_manager();
using bucket_t = ss::internal::shared_token_bucket<
uint64_t,
std::ratio<1>,
ss::internal::capped_release::no,
clock>;
using buckets_t = kafka::ingress_egress_state<
ssx::sharded_ptr<kafka::snc_quota_manager::bucket_t>>;

explicit snc_quota_manager(buckets_t& node_quota);
snc_quota_manager(const snc_quota_manager&) = delete;
snc_quota_manager& operator=(const snc_quota_manager&) = delete;
snc_quota_manager(snc_quota_manager&&) = delete;
Expand Down Expand Up @@ -199,8 +222,10 @@ class snc_quota_manager
void adjust_quota(const ingress_egress_state<quota_t>& delta) noexcept;

private:
void update_balance_config();
// configuration
config::binding<std::chrono::milliseconds> _max_kafka_throttle_delay;
config::binding<bool> _use_throttling_v2;
ingress_egress_state<config::binding<std::optional<quota_t>>>
_kafka_throughput_limit_node_bps;
config::binding<std::chrono::milliseconds> _kafka_quota_balancer_window;
Expand All @@ -222,9 +247,10 @@ class snc_quota_manager
ingress_egress_state<std::optional<quota_t>> _node_quota_default;
ingress_egress_state<quota_t> _shard_quota_minimum;
ingress_egress_state<bottomless_token_bucket> _shard_quota;
buckets_t& _node_quota;

// service
snc_quotas_probe _probe;
mutable snc_quotas_probe _probe;
};

// Names exposed in this namespace are for unit test integration only
Expand Down
Loading
Loading