From bd30a7401616210ccf26f661959ea7433b4113a0 Mon Sep 17 00:00:00 2001 From: Stephan Dollberg Date: Mon, 16 Dec 2024 10:06:17 +0000 Subject: [PATCH] fetch: Switch to chunked_vector for shard_fetch.requests `ntp_fetch_config` is 250 bytes. When doing super wide reads of 500-1000k partitions (quite niche) this goes above the oversized alloc threshold. Switch to chunked_vector. Switch responses too for uniformity. --- src/v/kafka/server/handlers/fetch.cc | 14 +++++++------- src/v/kafka/server/handlers/fetch.h | 18 +++++++++++------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index f98e318e534bf..2001a35ca34cd 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -442,7 +442,7 @@ read_result::memory_units_t reserve_memory_units( static void fill_fetch_responses( op_context& octx, std::vector results, - const std::vector& responses, + const chunked_vector& responses, op_context::latency_point start_time, bool record_latency = true) { auto range = boost::irange(0, results.size()); @@ -553,7 +553,7 @@ static void fill_fetch_responses( static ss::future> fetch_ntps_in_parallel( cluster::partition_manager& cluster_pm, const replica_selector& replica_selector, - std::vector ntp_fetch_configs, + chunked_vector ntp_fetch_configs, read_distribution_probe& read_probe, bool foreign_read, std::optional deadline, @@ -721,7 +721,7 @@ class fetch_worker { std::optional deadline; // The fetch sub-requests of partitions local to the shard this worker // is running on. - std::vector requests; + chunked_vector requests; // References to services local to the shard this worker is running on. // They are protected from deletion by the coordinator. @@ -789,7 +789,7 @@ class fetch_worker { }; ss::future - query_requests(std::vector requests) { + query_requests(chunked_vector requests) { // The last visible indexes need to be populated before partitions // are read. If they are populated afterwards then the // last_visible_index could be updated after the partition is read, @@ -910,10 +910,10 @@ class fetch_worker { size_t total_size{0}; for (;;) { - std::vector requests; + chunked_vector requests; if (first_run) { - requests = _ctx.requests; + requests = _ctx.requests.copy(); } else { requests_map.clear(); @@ -1191,7 +1191,7 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl { shard = fetch.shard, min_fetch_bytes, foreign_read, - configs = fetch.requests, + configs = fetch.requests.copy(), &octx](cluster::partition_manager& mgr) mutable -> ss::future { // Although this and octx are captured by reference across diff --git a/src/v/kafka/server/handlers/fetch.h b/src/v/kafka/server/handlers/fetch.h index 5418bb5a2e302..30ba25e69473c 100644 --- a/src/v/kafka/server/handlers/fetch.h +++ b/src/v/kafka/server/handlers/fetch.h @@ -10,6 +10,7 @@ */ #pragma once #include "cluster/rm_stm.h" +#include "container/fragmented_vector.h" #include "container/intrusive_list_helpers.h" #include "kafka/protocol/fetch.h" #include "kafka/server/handlers/fetch/replica_selector.h" @@ -329,8 +330,10 @@ struct read_result { // struct aggregating fetch requests and corresponding response iterators for // the same shard struct shard_fetch { - explicit shard_fetch(op_context::latency_point start_time) - : start_time{start_time} {} + explicit shard_fetch( + ss::shard_id shard_id, op_context::latency_point start_time) + : shard(shard_id) + , start_time{start_time} {} void push_back( ntp_fetch_config config, op_context::response_placeholder_ptr r_ph) { @@ -345,8 +348,8 @@ struct shard_fetch { } ss::shard_id shard; - std::vector requests; - std::vector responses; + chunked_vector requests; + chunked_vector responses; op_context::latency_point start_time; friend std::ostream& operator<<(std::ostream& o, const shard_fetch& sf) { @@ -359,9 +362,10 @@ struct fetch_plan { explicit fetch_plan( size_t shards, op_context::latency_point start_time = op_context::latency_clock::now()) - : fetches_per_shard(shards, shard_fetch(start_time)) { - for (size_t i = 0; i < fetches_per_shard.size(); i++) { - fetches_per_shard[i].shard = i; + : fetches_per_shard() { + fetches_per_shard.reserve(shards); + for (size_t i = 0; i < shards; i++) { + fetches_per_shard.emplace_back(i, start_time); } }