Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.2.x] CORE-8394 cluster: consider shard0 reserve in check_cluster_limits #24462

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions src/v/cluster/scheduling/allocation_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,26 @@ allocation_node::allocation_node(
});
}

bool allocation_node::is_full(
const model::ntp& ntp, bool will_add_allocation) const {
// Internal topics are excluded from checks to prevent allocation failures
// when creating them. This is okay because they are fairly small in number
// compared to kafka user topic partitions.
bool allocation_node::is_internal_topic(
const config::binding<std::vector<ss::sstring>>& internal_kafka_topics,
model::topic_namespace_view ntp) {
auto is_internal_ns = ntp.ns == model::redpanda_ns
|| ntp.ns == model::kafka_internal_namespace;
if (is_internal_ns) {
return false;
return true;
}
const auto& internal_topics = _internal_kafka_topics();
auto is_internal_topic = ntp.ns == model::kafka_namespace
&& std::any_of(
internal_topics.cbegin(),
internal_topics.cend(),
[&ntp](const ss::sstring& topic) {
return topic == ntp.tp.topic();
});
const auto& internal_topics = internal_kafka_topics();
return ntp.ns == model::kafka_namespace
&& std::any_of(
internal_topics.cbegin(),
internal_topics.cend(),
[&ntp](const ss::sstring& topic) { return topic == ntp.tp; });
}

bool allocation_node::is_full(
const model::ntp& ntp, bool will_add_allocation) const {
auto is_internal_topic = allocation_node::is_internal_topic(
_internal_kafka_topics, model::topic_namespace_view{ntp});

auto count = _allocated_partitions;
if (will_add_allocation) {
Expand Down
7 changes: 7 additions & 0 deletions src/v/cluster/scheduling/allocation_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ class allocation_node {
}
bool is_full(const model::ntp&, bool will_add_allocation) const;

// Internal topics are excluded from checks to prevent allocation failures
// when creating them. This is okay because they are fairly small in number
// compared to kafka user topic partitions.
static bool is_internal_topic(
const config::binding<std::vector<ss::sstring>>& internal_kafka_topics,
model::topic_namespace_view ntp);

private:
friend allocation_state;

Expand Down
21 changes: 15 additions & 6 deletions src/v/cluster/scheduling/partition_allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,16 @@ allocation_constraints partition_allocator::default_constraints() {
* with partitions that cannot be re-accommodated on smaller peers).
*/
std::error_code partition_allocator::check_cluster_limits(
const uint64_t new_partitions_replicas_requested) const {
const uint64_t new_partitions_replicas_requested,
const model::topic_namespace& topic) const {
if (_members.local().nodes().empty()) {
// Empty members table, we're probably running in a unit test
return errc::success;
}
if (allocation_node::is_internal_topic(_internal_kafka_topics, topic)) {
return errc::success;
}

// Calculate how many partition-replicas already exist, so that we can
// check if the new topic would take us past any limits.
uint64_t existent_partitions{0};
Expand Down Expand Up @@ -169,15 +174,19 @@ std::error_code partition_allocator::check_cluster_limits(

// Refuse to create a partition count that would violate the per-core
// limit.
const uint64_t core_limit = (effective_cpu_count * _partitions_per_shard());
const uint64_t core_limit = (effective_cpu_count * _partitions_per_shard())
- (broker_count * _partitions_reserve_shard0());
if (proposed_total_partitions > core_limit) {
vlog(
clusterlog.warn,
"Refusing to create {} partitions as total partition count {} would "
"exceed core limit {}",
"exceed the core-based limit {} (per-shard limit: {}, shard0 "
"reservation: {})",
new_partitions_replicas_requested,
proposed_total_partitions,
effective_cpu_count * _partitions_per_shard());
core_limit,
_partitions_per_shard(),
_partitions_reserve_shard0());
return errc::topic_invalid_partitions_core_limit;
}

Expand Down Expand Up @@ -243,7 +252,7 @@ partition_allocator::allocate(simple_allocation_request simple_req) {
const uint64_t create_count
= static_cast<uint64_t>(simple_req.additional_partitions)
* static_cast<uint64_t>(simple_req.replication_factor);
auto cluster_errc = check_cluster_limits(create_count);
auto cluster_errc = check_cluster_limits(create_count, simple_req.tp_ns);
if (cluster_errc) {
co_return cluster_errc;
}
Expand Down Expand Up @@ -276,7 +285,7 @@ partition_allocator::allocate(allocation_request request) {
}
}

auto cluster_errc = check_cluster_limits(create_count);
auto cluster_errc = check_cluster_limits(create_count, request._nt);
if (cluster_errc) {
co_return cluster_errc;
}
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster/scheduling/partition_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "cluster/scheduling/types.h"
#include "config/property.h"
#include "features/fwd.h"
#include "model/metadata.h"

namespace cluster {

Expand Down Expand Up @@ -152,7 +153,8 @@ class partition_allocator {
// new_partitions_replicas_requested represents the total number of
// partitions requested by a request. i.e. partitions * replicas requested.
std::error_code check_cluster_limits(
const uint64_t new_partitions_replicas_requested) const;
const uint64_t new_partitions_replicas_requested,
const model::topic_namespace& topic) const;

ss::future<result<allocation_units::pointer>>
do_allocate(allocation_request);
Expand Down
6 changes: 3 additions & 3 deletions src/v/cluster/tests/partition_allocator_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

struct partition_allocator_fixture {
static constexpr uint32_t partitions_per_shard = 1000;
static constexpr uint32_t partitions_reserve_shard0 = 2;

partition_allocator_fixture()
: partition_allocator_fixture(std::nullopt, std::nullopt) {}
Expand Down Expand Up @@ -68,7 +67,7 @@ struct partition_allocator_fixture {
broker.id(),
broker.properties().cores,
config::mock_binding<uint32_t>(uint32_t{partitions_per_shard}),
config::mock_binding<uint32_t>(uint32_t{partitions_reserve_shard0}),
partitions_reserve_shard0.bind(),
kafka_internal_topics.bind()));
}

Expand Down Expand Up @@ -138,6 +137,7 @@ struct partition_allocator_fixture {
cluster::partition_allocator& allocator() { return _allocator.local(); }

config::mock_property<std::vector<ss::sstring>> kafka_internal_topics{{}};
config::mock_property<uint32_t> partitions_reserve_shard0{2};
model::topic_namespace tn{model::kafka_namespace, model::topic{"test"}};
ss::sharded<cluster::members_table> members;
ss::sharded<features::feature_table> features;
Expand All @@ -158,7 +158,7 @@ struct partition_allocator_fixture {
config::mock_binding<std::optional<size_t>>(memory_per_partition),
config::mock_binding<std::optional<int32_t>>(fds_per_partition),
config::mock_binding<uint32_t>(uint32_t{partitions_per_shard}),
config::mock_binding<uint32_t>(uint32_t{partitions_reserve_shard0}),
partitions_reserve_shard0.bind(),
kafka_internal_topics.bind(),
config::mock_binding<bool>(true))
.get();
Expand Down
24 changes: 23 additions & 1 deletion src/v/cluster/tests/partition_allocator_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,28 @@ FIXTURE_TEST(allocation_over_capacity, partition_allocator_fixture) {
allocator().allocate(make_allocation_request(int_2, 1, 1)).get());
}

FIXTURE_TEST(
allocation_over_capacity_without_shard0, partition_allocator_fixture) {
// Disable shard0 reservations
partitions_reserve_shard0.update(0);

register_node(0, 6);
register_node(1, 6);
register_node(2, 6);

saturate_all_machines();
auto gr = allocator().state().last_group_id();
BOOST_REQUIRE(
allocator().allocate(make_allocation_request(1, 1)).get().has_error());
// group id hasn't changed
BOOST_REQUIRE_EQUAL(allocator().state().last_group_id(), gr);

// Make the topic internal and retry, should work.
kafka_internal_topics.update({tn.tp()});
BOOST_REQUIRE(allocator().allocate(make_allocation_request(1, 1)).get());
BOOST_REQUIRE_GT(allocator().state().last_group_id(), gr);
}

FIXTURE_TEST(max_allocation, partition_allocator_fixture) {
register_node(0, 2);
register_node(1, 2);
Expand Down Expand Up @@ -530,7 +552,7 @@ FIXTURE_TEST(updating_nodes_properties, partition_allocator_fixture) {
BOOST_REQUIRE_EQUAL(
it->second->max_capacity(),
10 * partition_allocator_fixture::partitions_per_shard
- partition_allocator_fixture::partitions_reserve_shard0);
- partitions_reserve_shard0());
}

FIXTURE_TEST(change_replication_factor, partition_allocator_fixture) {
Expand Down
Loading