Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.3.x] Health report collection improvements #17864

1 change: 1 addition & 0 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "cluster/feature_manager.h"
#include "cluster/fwd.h"
#include "cluster/health_manager.h"
#include "cluster/health_monitor_backend.h"
#include "cluster/health_monitor_frontend.h"
#include "cluster/logger.h"
#include "cluster/members_backend.h"
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/feature_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "cluster/cluster_utils.h"
#include "cluster/commands.h"
#include "cluster/controller_service.h"
#include "cluster/health_monitor_backend.h"
#include "cluster/health_monitor_frontend.h"
#include "cluster/health_monitor_types.h"
#include "cluster/logger.h"
Expand Down
87 changes: 42 additions & 45 deletions src/v/cluster/health_monitor_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,7 @@ health_monitor_backend::collect_remote_node_health(model::node_id id) {
max_metadata_age(),
[timeout](controller_client_protocol client) mutable {
return client.collect_node_health_report(
get_node_health_request{.filter = node_report_filter{}},
rpc::client_opts(timeout));
get_node_health_request{}, rpc::client_opts(timeout));
})
.then(&rpc::get_ctx_data<get_node_health_reply>)
.then([this, id](result<get_node_health_reply> reply) {
Expand Down Expand Up @@ -435,7 +434,8 @@ ss::future<std::error_code> health_monitor_backend::collect_cluster_health() {
auto reports = co_await ssx::async_transform(
ids.begin(), ids.end(), [this](model::node_id id) {
if (id == _self) {
return collect_current_node_health(node_report_filter{});
return _report_collection_mutex.with(
[this] { return collect_current_node_health(); });
}
return collect_remote_node_health(id);
});
Expand Down Expand Up @@ -514,8 +514,8 @@ ss::future<std::error_code> health_monitor_backend::collect_cluster_health() {
}

ss::future<result<node_health_report>>
health_monitor_backend::collect_current_node_health(node_report_filter filter) {
vlog(clusterlog.debug, "collecting health report with filter: {}", filter);
health_monitor_backend::collect_current_node_health() {
vlog(clusterlog.debug, "collecting health report");
node_health_report ret;
ret.id = _self;

Expand All @@ -525,36 +525,55 @@ health_monitor_backend::collect_current_node_health(node_report_filter filter) {

ret.drain_status = co_await _drain_manager.local().status();
ret.include_drain_status = true;
ret.topics = co_await collect_topic_status();

if (filter.include_partitions) {
ret.topics = co_await collect_topic_status(
std::move(filter.ntp_filters));
}
auto [it, _] = _status.try_emplace(ret.id);
it->second.is_alive = alive::yes;
it->second.last_reply_timestamp = ss::lowres_clock::now();

co_return ret;
}
ss::future<result<node_health_report>>
health_monitor_backend::get_current_node_health() {
vlog(clusterlog.debug, "getting current node health");

auto it = _reports.find(_self);
if (it != _reports.end()) {
co_return it->second;
}

auto u = _report_collection_mutex.try_get_units();
if (!u) {
vlog(
clusterlog.debug,
"report collection in progress, waiting for report to be available");
u.emplace(co_await _report_collection_mutex.get_units());
auto it = _reports.find(_self);
if (it != _reports.end()) {
co_return it->second;
}
}

co_return co_await collect_current_node_health();
}

namespace {

struct ntp_report {
model::topic_namespace tp_ns;
partition_status status;
};

chunked_vector<ntp_report> collect_shard_local_reports(
partition_manager& pm, const partitions_filter& filters) {
chunked_vector<ntp_report> collect_shard_local_reports(partition_manager& pm) {
chunked_vector<ntp_report> reports;
// empty filter, collect all
if (filters.namespaces.empty()) {
reports.reserve(pm.partitions().size());
std::transform(
pm.partitions().begin(),
pm.partitions().end(),
std::back_inserter(reports),
[](auto& p) {
return ntp_report {

reports.reserve(pm.partitions().size());
std::transform(
pm.partitions().begin(),
pm.partitions().end(),
std::back_inserter(reports),
[](auto& p) {
return ntp_report {
.tp_ns = model::topic_namespace(p.first.ns, p.first.tp.topic),
.status = partition_status{
.id = p.first.tp.partition,
Expand All @@ -569,27 +588,7 @@ chunked_vector<ntp_report> collect_shard_local_reports(
= p.second->reclaimable_size_bytes(),
},
};
});
} else {
for (const auto& [ntp, partition] : pm.partitions()) {
if (filters.matches(ntp)) {
reports.push_back(ntp_report{
.tp_ns = model::topic_namespace(ntp.ns, ntp.tp.topic),
.status = partition_status{
.id = ntp.tp.partition,
.term = partition->term(),
.leader_id = partition->get_leader_id(),
.revision_id = partition->get_revision_id(),
.size_bytes = partition->size_bytes()
+ partition->non_log_disk_size_bytes(),
.under_replicated_replicas
= partition->get_under_replicated(),
.reclaimable_size_bytes
= partition->reclaimable_size_bytes(),
}});
}
}
}
});

return reports;
}
Expand All @@ -606,11 +605,9 @@ reports_acc_t reduce_reports_map(
}
} // namespace
ss::future<chunked_vector<topic_status>>
health_monitor_backend::collect_topic_status(partitions_filter filters) {
health_monitor_backend::collect_topic_status() {
auto reports_map = co_await _partition_manager.map_reduce0(
[&filters](partition_manager& pm) {
return collect_shard_local_reports(pm, filters);
},
[](partition_manager& pm) { return collect_shard_local_reports(pm); },
reports_acc_t{},
&reduce_reports_map);

Expand Down
13 changes: 9 additions & 4 deletions src/v/cluster/health_monitor_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,12 @@ class health_monitor_backend {
ss::future<storage::disk_space_alert> get_cluster_disk_health(
force_refresh refresh, model::timeout_clock::time_point deadline);

ss::future<result<node_health_report>>
collect_current_node_health(node_report_filter);
ss::future<result<node_health_report>> collect_current_node_health();
/**
* Return cached version of current node health of collects it if it is not
* available in cache.
*/
ss::future<result<node_health_report>> get_current_node_health();

cluster::notification_id_type register_node_callback(health_node_cb_t cb);
void unregister_node_callback(cluster::notification_id_type id);
Expand Down Expand Up @@ -126,8 +130,7 @@ class health_monitor_backend {
std::optional<node_health_report>
build_node_report(model::node_id, const node_report_filter&);

ss::future<chunked_vector<topic_status>>
collect_topic_status(partitions_filter);
ss::future<chunked_vector<topic_status>> collect_topic_status();

result<node_health_report>
process_node_reply(model::node_id, result<get_node_health_reply>);
Expand Down Expand Up @@ -192,6 +195,8 @@ class health_monitor_backend {
_node_callbacks;
cluster::notification_id_type _next_callback_id{0};

mutex _report_collection_mutex{"health_report_collection"};

friend struct health_report_accessor;
};
} // namespace cluster
15 changes: 8 additions & 7 deletions src/v/cluster/health_monitor_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
#include "cluster/health_monitor_frontend.h"

#include "cluster/health_monitor_backend.h"
#include "cluster/logger.h"
#include "config/property.h"
#include "model/timeout_clock.h"
Expand Down Expand Up @@ -63,14 +64,14 @@ storage::disk_space_alert health_monitor_frontend::get_cluster_disk_health() {
return _cluster_disk_health;
}

// Collcts and returns current node health report according to provided
// filters list
/**
* Gets cached or collects a node health report.
*/
ss::future<result<node_health_report>>
health_monitor_frontend::collect_node_health(node_report_filter f) {
return dispatch_to_backend(
[f = std::move(f)](health_monitor_backend& be) mutable {
return be.collect_current_node_health(std::move(f));
});
health_monitor_frontend::get_current_node_health() {
return dispatch_to_backend([](health_monitor_backend& be) mutable {
return be.get_current_node_health();
});
}
std::optional<alive>
health_monitor_frontend::is_alive(model::node_id id) const {
Expand Down
12 changes: 3 additions & 9 deletions src/v/cluster/health_monitor_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
*/
#pragma once
#include "cluster/fwd.h"
#include "cluster/health_monitor_backend.h"
#include "cluster/health_monitor_types.h"
#include "cluster/node_status_table.h"
#include "config/property.h"
Expand All @@ -21,9 +20,6 @@

#include <seastar/core/sharded.hh>

#include <chrono>
#include <utility>

namespace cluster {

/**
Expand Down Expand Up @@ -63,10 +59,8 @@ class health_monitor_frontend

storage::disk_space_alert get_cluster_disk_health();

// Collcts and returns current node health report according to provided
// filters list
ss::future<result<node_health_report>>
collect_node_health(node_report_filter);
// Collects or return cached version of current node health report.
ss::future<result<node_health_report>> get_current_node_health();

/**
* Return drain status for a given node.
Expand Down Expand Up @@ -99,7 +93,7 @@ class health_monitor_frontend
template<typename Func>
auto dispatch_to_backend(Func&& f) {
return _backend.invoke_on(
health_monitor_backend::shard, std::forward<Func>(f));
health_monitor_backend_shard, std::forward<Func>(f));
}

ss::sharded<health_monitor_backend>& _backend;
Expand Down
27 changes: 23 additions & 4 deletions src/v/cluster/health_monitor_types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ bool operator==(const topic_status& a, const topic_status& b) {
}

std::ostream& operator<<(std::ostream& o, const topic_status& tl) {
fmt::print(o, "{{topic: {}, leaders: {}}}", tl.tp_ns, tl.partitions);
fmt::print(o, "{{topic: {}, partitions: {}}}", tl.tp_ns, tl.partitions);
return o;
}

Expand Down Expand Up @@ -237,9 +237,8 @@ std::ostream& operator<<(std::ostream& o, const partitions_filter& filter) {
return o;
}

std::ostream& operator<<(std::ostream& o, const get_node_health_request& r) {
fmt::print(
o, "{{filter: {}, current_version: {}}}", r.filter, r.current_version);
std::ostream& operator<<(std::ostream& o, const get_node_health_request&) {
fmt::print(o, "{{}}");
return o;
}

Expand All @@ -263,4 +262,24 @@ std::ostream& operator<<(std::ostream& o, const get_cluster_health_reply& r) {
return o;
}

std::ostream& operator<<(std::ostream& o, const cluster_health_overview& ho) {
fmt::print(
o,
"{{controller_id: {}, nodes: {}, unhealthy_reasons: {}, nodes_down: {}, "
"nodes_in_recovery_mode: {}, bytes_in_cloud_storage: {}, "
"leaderless_count: {}, under_replicated_count: {}, "
"leaderless_partitions: {}, under_replicated_partitions: {}}}",
ho.controller_id,
ho.all_nodes,
ho.unhealthy_reasons,
ho.nodes_down,
ho.nodes_in_recovery_mode,
ho.bytes_in_cloud_storage,
ho.leaderless_count,
ho.under_replicated_count,
ho.leaderless_partitions,
ho.under_replicated_partitions);
return o;
}

} // namespace cluster
30 changes: 15 additions & 15 deletions src/v/cluster/health_monitor_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

namespace cluster {

static constexpr ss::shard_id health_monitor_backend_shard = 0;
/**
* Health reports
*/
Expand Down Expand Up @@ -275,6 +276,9 @@ struct cluster_health_overview {
std::vector<model::ntp> under_replicated_partitions;
size_t under_replicated_count{};
std::optional<size_t> bytes_in_cloud_storage;

friend std::ostream&
operator<<(std::ostream&, const cluster_health_overview&);
};

using include_partitions_info = ss::bool_class<struct include_partitions_tag>;
Expand Down Expand Up @@ -348,23 +352,13 @@ using force_refresh = ss::bool_class<struct hm_force_refresh_tag>;
* RPC requests
*/

struct get_node_health_request
: serde::envelope<
class get_node_health_request
: public serde::envelope<
get_node_health_request,
serde::version<0>,
serde::compat_version<0>> {
public:
using rpc_adl_exempt = std::true_type;
static constexpr int8_t initial_version = 0;
// version -1: included revision id in partition status
static constexpr int8_t revision_id_version = -1;
// version -2: included size_bytes in partition status
static constexpr int8_t size_bytes_version = -2;

static constexpr int8_t current_version = size_bytes_version;

node_report_filter filter;
// this field is not serialized
int8_t decoded_version = current_version;

friend bool
operator==(const get_node_health_request&, const get_node_health_request&)
Expand All @@ -373,7 +367,14 @@ struct get_node_health_request
friend std::ostream&
operator<<(std::ostream&, const get_node_health_request&);

auto serde_fields() { return std::tie(filter); }
auto serde_fields() { return std::tie(_filter); }

private:
/**
* This field is no longer used, as it never was. It was made private on
* purpose
*/
node_report_filter _filter;
};

struct get_node_health_reply
Expand All @@ -382,7 +383,6 @@ struct get_node_health_reply
serde::version<0>,
serde::compat_version<0>> {
using rpc_adl_exempt = std::true_type;
static constexpr int8_t current_version = 0;

errc error = cluster::errc::success;
std::optional<node_health_report> report;
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/partition_balancer_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "cluster/partition_balancer_backend.h"

#include "cluster/health_monitor_backend.h"
#include "cluster/health_monitor_frontend.h"
#include "cluster/health_monitor_types.h"
#include "cluster/logger.h"
Expand Down
Loading
Loading