Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some oversized alloc high partition count improvements #24578

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/v/cluster/ntp_callbacks.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#pragma once

#include "cluster/notification.h"
#include "container/chunked_hash_map.h"
#include "model/fundamental.h"

#include <absl/container/flat_hash_map.h>
Expand Down Expand Up @@ -148,7 +149,7 @@ class ntp_callbacks {
template<typename Key, typename Value>
struct node {
callbacks_t callbacks;
absl::flat_hash_map<Key, Value> next;
chunked_hash_map<Key, Value> next;
};

notification_id_type
Expand Down
14 changes: 7 additions & 7 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ read_result::memory_units_t reserve_memory_units(
static void fill_fetch_responses(
op_context& octx,
std::vector<read_result> results,
const std::vector<op_context::response_placeholder_ptr>& responses,
const chunked_vector<op_context::response_placeholder_ptr>& responses,
op_context::latency_point start_time,
bool record_latency = true) {
auto range = boost::irange<size_t>(0, results.size());
Expand Down Expand Up @@ -553,7 +553,7 @@ static void fill_fetch_responses(
static ss::future<std::vector<read_result>> fetch_ntps_in_parallel(
cluster::partition_manager& cluster_pm,
const replica_selector& replica_selector,
std::vector<ntp_fetch_config> ntp_fetch_configs,
chunked_vector<ntp_fetch_config> ntp_fetch_configs,
read_distribution_probe& read_probe,
bool foreign_read,
std::optional<model::timeout_clock::time_point> deadline,
Expand Down Expand Up @@ -721,7 +721,7 @@ class fetch_worker {
std::optional<model::timeout_clock::time_point> deadline;
// The fetch sub-requests of partitions local to the shard this worker
// is running on.
std::vector<ntp_fetch_config> requests;
chunked_vector<ntp_fetch_config> requests;

// References to services local to the shard this worker is running on.
// They are protected from deletion by the coordinator.
Expand Down Expand Up @@ -789,7 +789,7 @@ class fetch_worker {
};

ss::future<query_results>
query_requests(std::vector<ntp_fetch_config> requests) {
query_requests(chunked_vector<ntp_fetch_config> requests) {
// The last visible indexes need to be populated before partitions
// are read. If they are populated afterwards then the
// last_visible_index could be updated after the partition is read,
Expand Down Expand Up @@ -910,10 +910,10 @@ class fetch_worker {
size_t total_size{0};

for (;;) {
std::vector<ntp_fetch_config> requests;
chunked_vector<ntp_fetch_config> requests;

if (first_run) {
requests = _ctx.requests;
requests = _ctx.requests.copy();
} else {
requests_map.clear();

Expand Down Expand Up @@ -1191,7 +1191,7 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {
shard = fetch.shard,
min_fetch_bytes,
foreign_read,
configs = fetch.requests,
configs = fetch.requests.copy(),
&octx](cluster::partition_manager& mgr) mutable
-> ss::future<fetch_worker::worker_result> {
// Although this and octx are captured by reference across
Expand Down
18 changes: 11 additions & 7 deletions src/v/kafka/server/handlers/fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
#pragma once
#include "cluster/rm_stm.h"
#include "container/fragmented_vector.h"
#include "container/intrusive_list_helpers.h"
#include "kafka/protocol/fetch.h"
#include "kafka/server/handlers/fetch/replica_selector.h"
Expand Down Expand Up @@ -329,8 +330,10 @@ struct read_result {
// struct aggregating fetch requests and corresponding response iterators for
// the same shard
struct shard_fetch {
explicit shard_fetch(op_context::latency_point start_time)
: start_time{start_time} {}
explicit shard_fetch(
ss::shard_id shard_id, op_context::latency_point start_time)
: shard(shard_id)
, start_time{start_time} {}

void push_back(
ntp_fetch_config config, op_context::response_placeholder_ptr r_ph) {
Expand All @@ -345,8 +348,8 @@ struct shard_fetch {
}

ss::shard_id shard;
std::vector<ntp_fetch_config> requests;
std::vector<op_context::response_placeholder_ptr> responses;
chunked_vector<ntp_fetch_config> requests;
chunked_vector<op_context::response_placeholder_ptr> responses;
op_context::latency_point start_time;

friend std::ostream& operator<<(std::ostream& o, const shard_fetch& sf) {
Expand All @@ -359,9 +362,10 @@ struct fetch_plan {
explicit fetch_plan(
size_t shards,
op_context::latency_point start_time = op_context::latency_clock::now())
: fetches_per_shard(shards, shard_fetch(start_time)) {
for (size_t i = 0; i < fetches_per_shard.size(); i++) {
fetches_per_shard[i].shard = i;
: fetches_per_shard() {
travisdowns marked this conversation as resolved.
Show resolved Hide resolved
fetches_per_shard.reserve(shards);
for (size_t i = 0; i < shards; i++) {
fetches_per_shard.emplace_back(i, start_time);
}
}

Expand Down
Loading