Skip to content

Commit

Permalink
kafka/server/snc_quota_manager: Small refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed Feb 28, 2024
1 parent 679804c commit 55d8952
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 14 deletions.
31 changes: 17 additions & 14 deletions src/v/kafka/server/snc_quota_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,20 +176,8 @@ snc_quota_manager::snc_quota_manager()
_shard_quota.in.set_width(v);
_shard_quota.eg.set_width(v);
});
_kafka_quota_balancer_node_period.watch([this] {
if (_balancer_gate.is_closed()) {
return;
}
if (_balancer_timer.cancel()) {
// cancel() returns true only on the balancer shard
// because the timer is never armed on the others
arm_balancer_timer();
}
// if the balancer is disabled, this is where the quotas are reset to
// default. This needs to be called on every shard because the effective
// balance is updated directly in this case.
update_node_quota_default();
});
_kafka_quota_balancer_node_period.watch(
[this]() { update_balance_config(); });
_kafka_quota_balancer_min_shard_throughput_ratio.watch(
[this] { update_shard_quota_minimum(); });
_kafka_quota_balancer_min_shard_throughput_bps.watch(
Expand Down Expand Up @@ -852,6 +840,21 @@ void snc_quota_manager::adjust_quota(
vlog(klog.trace, "qm - Adjust quota: {} -> {}", delta, _shard_quota);
}

void snc_quota_manager::update_balance_config() {
if (_balancer_gate.is_closed()) {
return;
}
if (_balancer_timer.cancel()) {
// cancel() returns true only on the balancer shard
// because the timer is never armed on the others
arm_balancer_timer();
}
// if the balancer is disabled, this is where the quotas are reset to
// default. This needs to be called on every shard because the effective
// balance is updated directly in this case.
update_node_quota_default();
}

} // namespace kafka

struct parseless_formatter {
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/snc_quota_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ class snc_quota_manager
void adjust_quota(const ingress_egress_state<quota_t>& delta) noexcept;

private:
void update_balance_config();
// configuration
config::binding<std::chrono::milliseconds> _max_kafka_throttle_delay;
ingress_egress_state<config::binding<std::optional<quota_t>>>
Expand Down

0 comments on commit 55d8952

Please sign in to comment.