diff --git a/src/v/cluster/ntp_callbacks.h b/src/v/cluster/ntp_callbacks.h index 2c733862f6fb..8b520a86b3c2 100644 --- a/src/v/cluster/ntp_callbacks.h +++ b/src/v/cluster/ntp_callbacks.h @@ -12,6 +12,7 @@ #pragma once #include "cluster/notification.h" +#include "container/chunked_hash_map.h" #include "model/fundamental.h" #include @@ -148,7 +149,7 @@ class ntp_callbacks { template struct node { callbacks_t callbacks; - absl::flat_hash_map next; + chunked_hash_map next; }; notification_id_type diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index f98e318e534b..2001a35ca34c 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -442,7 +442,7 @@ read_result::memory_units_t reserve_memory_units( static void fill_fetch_responses( op_context& octx, std::vector results, - const std::vector& responses, + const chunked_vector& responses, op_context::latency_point start_time, bool record_latency = true) { auto range = boost::irange(0, results.size()); @@ -553,7 +553,7 @@ static void fill_fetch_responses( static ss::future> fetch_ntps_in_parallel( cluster::partition_manager& cluster_pm, const replica_selector& replica_selector, - std::vector ntp_fetch_configs, + chunked_vector ntp_fetch_configs, read_distribution_probe& read_probe, bool foreign_read, std::optional deadline, @@ -721,7 +721,7 @@ class fetch_worker { std::optional deadline; // The fetch sub-requests of partitions local to the shard this worker // is running on. - std::vector requests; + chunked_vector requests; // References to services local to the shard this worker is running on. // They are protected from deletion by the coordinator. @@ -789,7 +789,7 @@ class fetch_worker { }; ss::future - query_requests(std::vector requests) { + query_requests(chunked_vector requests) { // The last visible indexes need to be populated before partitions // are read. If they are populated afterwards then the // last_visible_index could be updated after the partition is read, @@ -910,10 +910,10 @@ class fetch_worker { size_t total_size{0}; for (;;) { - std::vector requests; + chunked_vector requests; if (first_run) { - requests = _ctx.requests; + requests = _ctx.requests.copy(); } else { requests_map.clear(); @@ -1191,7 +1191,7 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl { shard = fetch.shard, min_fetch_bytes, foreign_read, - configs = fetch.requests, + configs = fetch.requests.copy(), &octx](cluster::partition_manager& mgr) mutable -> ss::future { // Although this and octx are captured by reference across diff --git a/src/v/kafka/server/handlers/fetch.h b/src/v/kafka/server/handlers/fetch.h index 5418bb5a2e30..30ba25e69473 100644 --- a/src/v/kafka/server/handlers/fetch.h +++ b/src/v/kafka/server/handlers/fetch.h @@ -10,6 +10,7 @@ */ #pragma once #include "cluster/rm_stm.h" +#include "container/fragmented_vector.h" #include "container/intrusive_list_helpers.h" #include "kafka/protocol/fetch.h" #include "kafka/server/handlers/fetch/replica_selector.h" @@ -329,8 +330,10 @@ struct read_result { // struct aggregating fetch requests and corresponding response iterators for // the same shard struct shard_fetch { - explicit shard_fetch(op_context::latency_point start_time) - : start_time{start_time} {} + explicit shard_fetch( + ss::shard_id shard_id, op_context::latency_point start_time) + : shard(shard_id) + , start_time{start_time} {} void push_back( ntp_fetch_config config, op_context::response_placeholder_ptr r_ph) { @@ -345,8 +348,8 @@ struct shard_fetch { } ss::shard_id shard; - std::vector requests; - std::vector responses; + chunked_vector requests; + chunked_vector responses; op_context::latency_point start_time; friend std::ostream& operator<<(std::ostream& o, const shard_fetch& sf) { @@ -359,9 +362,10 @@ struct fetch_plan { explicit fetch_plan( size_t shards, op_context::latency_point start_time = op_context::latency_clock::now()) - : fetches_per_shard(shards, shard_fetch(start_time)) { - for (size_t i = 0; i < fetches_per_shard.size(); i++) { - fetches_per_shard[i].shard = i; + : fetches_per_shard() { + fetches_per_shard.reserve(shards); + for (size_t i = 0; i < shards; i++) { + fetches_per_shard.emplace_back(i, start_time); } }