diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index ff36696fbf50..79761e4aafe3 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -31,6 +31,7 @@ #include "cluster/feature_manager.h" #include "cluster/fwd.h" #include "cluster/health_manager.h" +#include "cluster/health_monitor_backend.h" #include "cluster/health_monitor_frontend.h" #include "cluster/logger.h" #include "cluster/members_backend.h" diff --git a/src/v/cluster/feature_manager.cc b/src/v/cluster/feature_manager.cc index 3b2a270554c7..09fe5f2ac290 100644 --- a/src/v/cluster/feature_manager.cc +++ b/src/v/cluster/feature_manager.cc @@ -14,6 +14,7 @@ #include "cluster/cluster_utils.h" #include "cluster/commands.h" #include "cluster/controller_service.h" +#include "cluster/health_monitor_backend.h" #include "cluster/health_monitor_frontend.h" #include "cluster/health_monitor_types.h" #include "cluster/logger.h" diff --git a/src/v/cluster/health_monitor_backend.cc b/src/v/cluster/health_monitor_backend.cc index 7987c57b8607..00a6c6464460 100644 --- a/src/v/cluster/health_monitor_backend.cc +++ b/src/v/cluster/health_monitor_backend.cc @@ -363,8 +363,7 @@ health_monitor_backend::collect_remote_node_health(model::node_id id) { max_metadata_age(), [timeout](controller_client_protocol client) mutable { return client.collect_node_health_report( - get_node_health_request{.filter = node_report_filter{}}, - rpc::client_opts(timeout)); + get_node_health_request{}, rpc::client_opts(timeout)); }) .then(&rpc::get_ctx_data) .then([this, id](result reply) { @@ -435,7 +434,8 @@ ss::future health_monitor_backend::collect_cluster_health() { auto reports = co_await ssx::async_transform( ids.begin(), ids.end(), [this](model::node_id id) { if (id == _self) { - return collect_current_node_health(node_report_filter{}); + return _report_collection_mutex.with( + [this] { return collect_current_node_health(); }); } return collect_remote_node_health(id); }); @@ -514,8 +514,8 @@ ss::future health_monitor_backend::collect_cluster_health() { } ss::future> -health_monitor_backend::collect_current_node_health(node_report_filter filter) { - vlog(clusterlog.debug, "collecting health report with filter: {}", filter); +health_monitor_backend::collect_current_node_health() { + vlog(clusterlog.debug, "collecting health report"); node_health_report ret; ret.id = _self; @@ -525,17 +525,38 @@ health_monitor_backend::collect_current_node_health(node_report_filter filter) { ret.drain_status = co_await _drain_manager.local().status(); ret.include_drain_status = true; + ret.topics = co_await collect_topic_status(); - if (filter.include_partitions) { - ret.topics = co_await collect_topic_status( - std::move(filter.ntp_filters)); - } auto [it, _] = _status.try_emplace(ret.id); it->second.is_alive = alive::yes; it->second.last_reply_timestamp = ss::lowres_clock::now(); co_return ret; } +ss::future> +health_monitor_backend::get_current_node_health() { + vlog(clusterlog.debug, "getting current node health"); + + auto it = _reports.find(_self); + if (it != _reports.end()) { + co_return it->second; + } + + auto u = _report_collection_mutex.try_get_units(); + if (!u) { + vlog( + clusterlog.debug, + "report collection in progress, waiting for report to be available"); + u.emplace(co_await _report_collection_mutex.get_units()); + auto it = _reports.find(_self); + if (it != _reports.end()) { + co_return it->second; + } + } + + co_return co_await collect_current_node_health(); +} + namespace { struct ntp_report { @@ -543,18 +564,16 @@ struct ntp_report { partition_status status; }; -chunked_vector collect_shard_local_reports( - partition_manager& pm, const partitions_filter& filters) { +chunked_vector collect_shard_local_reports(partition_manager& pm) { chunked_vector reports; - // empty filter, collect all - if (filters.namespaces.empty()) { - reports.reserve(pm.partitions().size()); - std::transform( - pm.partitions().begin(), - pm.partitions().end(), - std::back_inserter(reports), - [](auto& p) { - return ntp_report { + + reports.reserve(pm.partitions().size()); + std::transform( + pm.partitions().begin(), + pm.partitions().end(), + std::back_inserter(reports), + [](auto& p) { + return ntp_report { .tp_ns = model::topic_namespace(p.first.ns, p.first.tp.topic), .status = partition_status{ .id = p.first.tp.partition, @@ -569,27 +588,7 @@ chunked_vector collect_shard_local_reports( = p.second->reclaimable_size_bytes(), }, }; - }); - } else { - for (const auto& [ntp, partition] : pm.partitions()) { - if (filters.matches(ntp)) { - reports.push_back(ntp_report{ - .tp_ns = model::topic_namespace(ntp.ns, ntp.tp.topic), - .status = partition_status{ - .id = ntp.tp.partition, - .term = partition->term(), - .leader_id = partition->get_leader_id(), - .revision_id = partition->get_revision_id(), - .size_bytes = partition->size_bytes() - + partition->non_log_disk_size_bytes(), - .under_replicated_replicas - = partition->get_under_replicated(), - .reclaimable_size_bytes - = partition->reclaimable_size_bytes(), - }}); - } - } - } + }); return reports; } @@ -606,11 +605,9 @@ reports_acc_t reduce_reports_map( } } // namespace ss::future> -health_monitor_backend::collect_topic_status(partitions_filter filters) { +health_monitor_backend::collect_topic_status() { auto reports_map = co_await _partition_manager.map_reduce0( - [&filters](partition_manager& pm) { - return collect_shard_local_reports(pm, filters); - }, + [](partition_manager& pm) { return collect_shard_local_reports(pm); }, reports_acc_t{}, &reduce_reports_map); diff --git a/src/v/cluster/health_monitor_backend.h b/src/v/cluster/health_monitor_backend.h index 799f045a5394..3fcf7675f409 100644 --- a/src/v/cluster/health_monitor_backend.h +++ b/src/v/cluster/health_monitor_backend.h @@ -68,8 +68,12 @@ class health_monitor_backend { ss::future get_cluster_disk_health( force_refresh refresh, model::timeout_clock::time_point deadline); - ss::future> - collect_current_node_health(node_report_filter); + ss::future> collect_current_node_health(); + /** + * Return cached version of current node health of collects it if it is not + * available in cache. + */ + ss::future> get_current_node_health(); cluster::notification_id_type register_node_callback(health_node_cb_t cb); void unregister_node_callback(cluster::notification_id_type id); @@ -126,8 +130,7 @@ class health_monitor_backend { std::optional build_node_report(model::node_id, const node_report_filter&); - ss::future> - collect_topic_status(partitions_filter); + ss::future> collect_topic_status(); result process_node_reply(model::node_id, result); @@ -192,6 +195,8 @@ class health_monitor_backend { _node_callbacks; cluster::notification_id_type _next_callback_id{0}; + mutex _report_collection_mutex{"health_report_collection"}; + friend struct health_report_accessor; }; } // namespace cluster diff --git a/src/v/cluster/health_monitor_frontend.cc b/src/v/cluster/health_monitor_frontend.cc index e7ab6d8b757d..d1bf9b39b862 100644 --- a/src/v/cluster/health_monitor_frontend.cc +++ b/src/v/cluster/health_monitor_frontend.cc @@ -10,6 +10,7 @@ */ #include "cluster/health_monitor_frontend.h" +#include "cluster/health_monitor_backend.h" #include "cluster/logger.h" #include "config/property.h" #include "model/timeout_clock.h" @@ -63,14 +64,14 @@ storage::disk_space_alert health_monitor_frontend::get_cluster_disk_health() { return _cluster_disk_health; } -// Collcts and returns current node health report according to provided -// filters list +/** + * Gets cached or collects a node health report. + */ ss::future> -health_monitor_frontend::collect_node_health(node_report_filter f) { - return dispatch_to_backend( - [f = std::move(f)](health_monitor_backend& be) mutable { - return be.collect_current_node_health(std::move(f)); - }); +health_monitor_frontend::get_current_node_health() { + return dispatch_to_backend([](health_monitor_backend& be) mutable { + return be.get_current_node_health(); + }); } std::optional health_monitor_frontend::is_alive(model::node_id id) const { diff --git a/src/v/cluster/health_monitor_frontend.h b/src/v/cluster/health_monitor_frontend.h index f23c320dfb99..ece4754ad624 100644 --- a/src/v/cluster/health_monitor_frontend.h +++ b/src/v/cluster/health_monitor_frontend.h @@ -10,7 +10,6 @@ */ #pragma once #include "cluster/fwd.h" -#include "cluster/health_monitor_backend.h" #include "cluster/health_monitor_types.h" #include "cluster/node_status_table.h" #include "config/property.h" @@ -21,9 +20,6 @@ #include -#include -#include - namespace cluster { /** @@ -63,10 +59,8 @@ class health_monitor_frontend storage::disk_space_alert get_cluster_disk_health(); - // Collcts and returns current node health report according to provided - // filters list - ss::future> - collect_node_health(node_report_filter); + // Collects or return cached version of current node health report. + ss::future> get_current_node_health(); /** * Return drain status for a given node. @@ -99,7 +93,7 @@ class health_monitor_frontend template auto dispatch_to_backend(Func&& f) { return _backend.invoke_on( - health_monitor_backend::shard, std::forward(f)); + health_monitor_backend_shard, std::forward(f)); } ss::sharded& _backend; diff --git a/src/v/cluster/health_monitor_types.cc b/src/v/cluster/health_monitor_types.cc index 80b8ff0c95d3..be2e2a15e042 100644 --- a/src/v/cluster/health_monitor_types.cc +++ b/src/v/cluster/health_monitor_types.cc @@ -195,7 +195,7 @@ bool operator==(const topic_status& a, const topic_status& b) { } std::ostream& operator<<(std::ostream& o, const topic_status& tl) { - fmt::print(o, "{{topic: {}, leaders: {}}}", tl.tp_ns, tl.partitions); + fmt::print(o, "{{topic: {}, partitions: {}}}", tl.tp_ns, tl.partitions); return o; } @@ -237,9 +237,8 @@ std::ostream& operator<<(std::ostream& o, const partitions_filter& filter) { return o; } -std::ostream& operator<<(std::ostream& o, const get_node_health_request& r) { - fmt::print( - o, "{{filter: {}, current_version: {}}}", r.filter, r.current_version); +std::ostream& operator<<(std::ostream& o, const get_node_health_request&) { + fmt::print(o, "{{}}"); return o; } @@ -263,4 +262,24 @@ std::ostream& operator<<(std::ostream& o, const get_cluster_health_reply& r) { return o; } +std::ostream& operator<<(std::ostream& o, const cluster_health_overview& ho) { + fmt::print( + o, + "{{controller_id: {}, nodes: {}, unhealthy_reasons: {}, nodes_down: {}, " + "nodes_in_recovery_mode: {}, bytes_in_cloud_storage: {}, " + "leaderless_count: {}, under_replicated_count: {}, " + "leaderless_partitions: {}, under_replicated_partitions: {}}}", + ho.controller_id, + ho.all_nodes, + ho.unhealthy_reasons, + ho.nodes_down, + ho.nodes_in_recovery_mode, + ho.bytes_in_cloud_storage, + ho.leaderless_count, + ho.under_replicated_count, + ho.leaderless_partitions, + ho.under_replicated_partitions); + return o; +} + } // namespace cluster diff --git a/src/v/cluster/health_monitor_types.h b/src/v/cluster/health_monitor_types.h index 2f33ee9e34e5..60164a0701b8 100644 --- a/src/v/cluster/health_monitor_types.h +++ b/src/v/cluster/health_monitor_types.h @@ -28,6 +28,7 @@ namespace cluster { +static constexpr ss::shard_id health_monitor_backend_shard = 0; /** * Health reports */ @@ -275,6 +276,9 @@ struct cluster_health_overview { std::vector under_replicated_partitions; size_t under_replicated_count{}; std::optional bytes_in_cloud_storage; + + friend std::ostream& + operator<<(std::ostream&, const cluster_health_overview&); }; using include_partitions_info = ss::bool_class; @@ -348,23 +352,13 @@ using force_refresh = ss::bool_class; * RPC requests */ -struct get_node_health_request - : serde::envelope< +class get_node_health_request + : public serde::envelope< get_node_health_request, serde::version<0>, serde::compat_version<0>> { +public: using rpc_adl_exempt = std::true_type; - static constexpr int8_t initial_version = 0; - // version -1: included revision id in partition status - static constexpr int8_t revision_id_version = -1; - // version -2: included size_bytes in partition status - static constexpr int8_t size_bytes_version = -2; - - static constexpr int8_t current_version = size_bytes_version; - - node_report_filter filter; - // this field is not serialized - int8_t decoded_version = current_version; friend bool operator==(const get_node_health_request&, const get_node_health_request&) @@ -373,7 +367,14 @@ struct get_node_health_request friend std::ostream& operator<<(std::ostream&, const get_node_health_request&); - auto serde_fields() { return std::tie(filter); } + auto serde_fields() { return std::tie(_filter); } + +private: + /** + * This field is no longer used, as it never was. It was made private on + * purpose + */ + node_report_filter _filter; }; struct get_node_health_reply @@ -382,7 +383,6 @@ struct get_node_health_reply serde::version<0>, serde::compat_version<0>> { using rpc_adl_exempt = std::true_type; - static constexpr int8_t current_version = 0; errc error = cluster::errc::success; std::optional report; diff --git a/src/v/cluster/partition_balancer_backend.cc b/src/v/cluster/partition_balancer_backend.cc index 0cfb8c19c69b..2eec6fe45000 100644 --- a/src/v/cluster/partition_balancer_backend.cc +++ b/src/v/cluster/partition_balancer_backend.cc @@ -10,6 +10,7 @@ #include "cluster/partition_balancer_backend.h" +#include "cluster/health_monitor_backend.h" #include "cluster/health_monitor_frontend.h" #include "cluster/health_monitor_types.h" #include "cluster/logger.h" diff --git a/src/v/cluster/service.cc b/src/v/cluster/service.cc index cd1533d849e4..ac0e485af61d 100644 --- a/src/v/cluster/service.cc +++ b/src/v/cluster/service.cc @@ -529,26 +529,14 @@ void clear_partition_sizes(node_health_report& report) { ss::future service::do_collect_node_health_report(get_node_health_request req) { - auto res = co_await _hm_frontend.local().collect_node_health( - std::move(req.filter)); + auto res = co_await _hm_frontend.local().get_current_node_health(); if (res.has_error()) { co_return get_node_health_reply{ .error = map_health_monitor_error_code(res.error())}; } - auto report = std::move(res.value()); - // clear all revision ids to prevent sending them to old versioned redpanda - // nodes - if (req.decoded_version > get_node_health_request::revision_id_version) { - clear_partition_revisions(report); - } - // clear all partition sizes to prevent sending them to old versioned - // redpanda nodes - if (req.decoded_version > get_node_health_request::size_bytes_version) { - clear_partition_sizes(report); - } co_return get_node_health_reply{ .error = errc::success, - .report = std::move(report), + .report = std::move(res.value()), }; } diff --git a/src/v/cluster/tests/health_monitor_test.cc b/src/v/cluster/tests/health_monitor_test.cc index 6fab15a181f0..0b45d79d54d0 100644 --- a/src/v/cluster/tests/health_monitor_test.cc +++ b/src/v/cluster/tests/health_monitor_test.cc @@ -20,6 +20,7 @@ #include "test_utils/fixture.h" #include +#include #include #include @@ -291,27 +292,6 @@ FIXTURE_TEST(test_ntp_filter, cluster_test_fixture) { report.value().node_reports.begin()->topics); }); }).get(); - - // check filtering in node report - tests::cooperative_spin_wait_with_timeout(10s, [&] { - return n1->controller->get_health_monitor() - .local() - .collect_node_health(f_1.node_report_filter) - .then([](result report) { - return report.has_value() - && contains_exactly_ntp_leaders( - g_seastar_test_log, - { - ntp(model::kafka_namespace, "tp-1", 0), - ntp(model::kafka_namespace, "tp-1", 2), - ntp(model::kafka_namespace, "tp-2", 0), - ntp(model::kafka_internal_namespace, "internal-1", 0), - ntp(model::kafka_internal_namespace, "internal-1", 1), - ntp(model::redpanda_ns, "controller", 0), - }, - report.value().topics); - }); - }).get(); } FIXTURE_TEST(test_alive_status, cluster_test_fixture) { @@ -570,3 +550,19 @@ FIXTURE_TEST(test_report_truncation, health_report_unit) { test_unhealthy(max_count + 1, LEADERLESS); test_unhealthy(max_count + 1, URP); } + +FIXTURE_TEST( + test_requesting_collection_at_the_same_time, cluster_test_fixture) { + auto n1 = create_node_application(model::node_id{0}); + /** + * Request reports + */ + auto f_h_1 + = n1->controller->get_health_monitor().local().get_current_node_health(); + auto f_h_2 + = n1->controller->get_health_monitor().local().get_current_node_health(); + + auto results = ss::when_all(std::move(f_h_1), std::move(f_h_2)).get(); + BOOST_REQUIRE( + std::get<0>(results).get().value() == std::get<1>(results).get().value()); +} diff --git a/src/v/cluster/tests/serialization_rt_test.cc b/src/v/cluster/tests/serialization_rt_test.cc index e0b0f6e9f3e8..42aa295997c7 100644 --- a/src/v/cluster/tests/serialization_rt_test.cc +++ b/src/v/cluster/tests/serialization_rt_test.cc @@ -1549,14 +1549,6 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) { }; roundtrip_test(data); } - { - cluster::get_node_health_request data{ - .filter = { - .ntp_filters = random_partitions_filter(), - }, - }; - roundtrip_test(data); - } { storage::disk data{ .path = random_generators::gen_alphanum_string( diff --git a/src/v/compat/get_node_health_compat.h b/src/v/compat/get_node_health_compat.h index 5b37e6ba66fe..4be06c29a1b7 100644 --- a/src/v/compat/get_node_health_compat.h +++ b/src/v/compat/get_node_health_compat.h @@ -18,10 +18,7 @@ namespace compat { -GEN_COMPAT_CHECK_SERDE_ONLY( - cluster::get_node_health_request, - { json_write(filter); }, - { json_read(filter); }); +EMPTY_COMPAT_CHECK_SERDE_ONLY(cluster::get_node_health_request); template<> struct compat_check { diff --git a/src/v/compat/get_node_health_generator.h b/src/v/compat/get_node_health_generator.h index a43a63e52006..636489a70c21 100644 --- a/src/v/compat/get_node_health_generator.h +++ b/src/v/compat/get_node_health_generator.h @@ -21,10 +21,7 @@ namespace compat { template<> struct instance_generator { static cluster::get_node_health_request random() { - return cluster::get_node_health_request{ - {}, - cluster::random_node_report_filter(), - random_generators::get_int()}; + return cluster::get_node_health_request{}; } static std::vector limits() { return {}; } }; diff --git a/tests/rptest/utils/node_operations.py b/tests/rptest/utils/node_operations.py index fb3f998509a1..f28c9547cfa4 100644 --- a/tests/rptest/utils/node_operations.py +++ b/tests/rptest/utils/node_operations.py @@ -98,18 +98,6 @@ def __init__(self, node_id ] if decommissioned_node_ids == None else decommissioned_node_ids - def _nodes_with_decommission_progress_api(self): - def has_decommission_progress_api(node): - v = int_tuple( - VERSION_RE.findall(self.redpanda.get_version(node))[0]) - # decommission progress api is available since v22.3.12 - return v[0] >= 23 or (v[0] == 22 and v[1] == 3 and v[2] >= 12) - - return [ - n for n in self.redpanda.started_nodes() - if has_decommission_progress_api(n) - ] - def _dump_partition_move_available_bandwidth(self): def get_metric(self, node): try: @@ -134,7 +122,7 @@ def get_metric(self, node): def _not_decommissioned_node(self): return random.choice([ - n for n in self._nodes_with_decommission_progress_api() + n for n in self.redpanda.started_nodes() if self.redpanda.node_id(n) not in self.decommissioned_node_ids ])