Skip to content

Commit

Permalink
fetch: Switch to chunked_vector for shard_fetch.requests
Browse files Browse the repository at this point in the history
`ntp_fetch_config` is 250 bytes. When doing super wide reads of
500-1000k partitions (quite niche) this goes above the oversized alloc
threshold. Switch to chunked_vector.

Switch responses too for uniformity.
  • Loading branch information
StephanDollberg committed Dec 19, 2024
1 parent 4a16a54 commit ff22b10
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 14 deletions.
14 changes: 7 additions & 7 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ read_result::memory_units_t reserve_memory_units(
static void fill_fetch_responses(
op_context& octx,
std::vector<read_result> results,
const std::vector<op_context::response_placeholder_ptr>& responses,
const chunked_vector<op_context::response_placeholder_ptr>& responses,
op_context::latency_point start_time,
bool record_latency = true) {
auto range = boost::irange<size_t>(0, results.size());
Expand Down Expand Up @@ -553,7 +553,7 @@ static void fill_fetch_responses(
static ss::future<std::vector<read_result>> fetch_ntps_in_parallel(
cluster::partition_manager& cluster_pm,
const replica_selector& replica_selector,
std::vector<ntp_fetch_config> ntp_fetch_configs,
chunked_vector<ntp_fetch_config> ntp_fetch_configs,
read_distribution_probe& read_probe,
bool foreign_read,
std::optional<model::timeout_clock::time_point> deadline,
Expand Down Expand Up @@ -721,7 +721,7 @@ class fetch_worker {
std::optional<model::timeout_clock::time_point> deadline;
// The fetch sub-requests of partitions local to the shard this worker
// is running on.
std::vector<ntp_fetch_config> requests;
chunked_vector<ntp_fetch_config> requests;

// References to services local to the shard this worker is running on.
// They are protected from deletion by the coordinator.
Expand Down Expand Up @@ -789,7 +789,7 @@ class fetch_worker {
};

ss::future<query_results>
query_requests(std::vector<ntp_fetch_config> requests) {
query_requests(chunked_vector<ntp_fetch_config> requests) {
// The last visible indexes need to be populated before partitions
// are read. If they are populated afterwards then the
// last_visible_index could be updated after the partition is read,
Expand Down Expand Up @@ -910,10 +910,10 @@ class fetch_worker {
size_t total_size{0};

for (;;) {
std::vector<ntp_fetch_config> requests;
chunked_vector<ntp_fetch_config> requests;

if (first_run) {
requests = _ctx.requests;
requests = _ctx.requests.copy();
} else {
requests_map.clear();

Expand Down Expand Up @@ -1191,7 +1191,7 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {
shard = fetch.shard,
min_fetch_bytes,
foreign_read,
configs = fetch.requests,
configs = fetch.requests.copy(),
&octx](cluster::partition_manager& mgr) mutable
-> ss::future<fetch_worker::worker_result> {
// Although this and octx are captured by reference across
Expand Down
18 changes: 11 additions & 7 deletions src/v/kafka/server/handlers/fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
#pragma once
#include "cluster/rm_stm.h"
#include "container/fragmented_vector.h"
#include "container/intrusive_list_helpers.h"
#include "kafka/protocol/fetch.h"
#include "kafka/server/handlers/fetch/replica_selector.h"
Expand Down Expand Up @@ -329,8 +330,10 @@ struct read_result {
// struct aggregating fetch requests and corresponding response iterators for
// the same shard
struct shard_fetch {
explicit shard_fetch(op_context::latency_point start_time)
: start_time{start_time} {}
explicit shard_fetch(
ss::shard_id shard_id, op_context::latency_point start_time)
: shard(shard_id)
, start_time{start_time} {}

void push_back(
ntp_fetch_config config, op_context::response_placeholder_ptr r_ph) {
Expand All @@ -345,8 +348,8 @@ struct shard_fetch {
}

ss::shard_id shard;
std::vector<ntp_fetch_config> requests;
std::vector<op_context::response_placeholder_ptr> responses;
chunked_vector<ntp_fetch_config> requests;
chunked_vector<op_context::response_placeholder_ptr> responses;
op_context::latency_point start_time;

friend std::ostream& operator<<(std::ostream& o, const shard_fetch& sf) {
Expand All @@ -359,9 +362,10 @@ struct fetch_plan {
explicit fetch_plan(
size_t shards,
op_context::latency_point start_time = op_context::latency_clock::now())
: fetches_per_shard(shards, shard_fetch(start_time)) {
for (size_t i = 0; i < fetches_per_shard.size(); i++) {
fetches_per_shard[i].shard = i;
: fetches_per_shard() {
fetches_per_shard.reserve(shards);
for (size_t i = 0; i < shards; i++) {
fetches_per_shard.emplace_back(i, start_time);
}
}

Expand Down

0 comments on commit ff22b10

Please sign in to comment.