Skip to content

Commit

Permalink
Merge pull request redpanda-data#24007 from nvartolomei/nv/log-manage…
Browse files Browse the repository at this point in the history
…r-probe

storage: housekeeping metrics
  • Loading branch information
piyushredpanda authored Nov 6, 2024
2 parents 1b24006 + 08b6020 commit 3b09748
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 4 deletions.
19 changes: 19 additions & 0 deletions src/v/storage/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ redpanda_cc_library(
"version.h",
],
implementation_deps = [
":log_manager_probe",
":logger",
"//src/v/syschecks",
],
Expand Down Expand Up @@ -441,3 +442,21 @@ redpanda_cc_library(
"@seastar",
],
)

redpanda_cc_library(
name = "log_manager_probe",
srcs = [
"log_manager_probe.cc",
],
hdrs = [
"log_manager_probe.h",
],
implementation_deps = [
"//src/v/config",
"@seastar",
],
include_prefix = "storage",
deps = [
"//src/v/metrics",
],
)
1 change: 1 addition & 0 deletions src/v/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ v_cc_library(
segment_reader.cc
segment_deduplication_utils.cc
log_manager.cc
log_manager_probe.cc
disk_log_impl.cc
disk_log_appender.cc
parser.cc
Expand Down
22 changes: 19 additions & 3 deletions src/v/storage/log_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "storage/key_offset_map.h"
#include "storage/kvstore.h"
#include "storage/log.h"
#include "storage/log_manager_probe.h"
#include "storage/logger.h"
#include "storage/segment.h"
#include "storage/segment_appender.h"
Expand Down Expand Up @@ -143,14 +144,17 @@ log_manager::log_manager(
, _feature_table(feature_table)
, _jitter(_config.compaction_interval())
, _trigger_gc_jitter(0s, 5s)
, _batch_cache(_config.reclaim_opts) {
, _batch_cache(_config.reclaim_opts)
, _probe(std::make_unique<log_manager_probe>()) {
_config.compaction_interval.watch([this]() {
_jitter = simple_time_jitter<ss::lowres_clock>{
_config.compaction_interval()};
_housekeeping_sem.signal();
});
}

log_manager::~log_manager() = default;

ss::future<> log_manager::clean_close(ss::shared_ptr<storage::log> log) {
auto clean_segment = co_await log->close();

Expand All @@ -171,6 +175,7 @@ ss::future<> log_manager::clean_close(ss::shared_ptr<storage::log> log) {
}

ss::future<> log_manager::start() {
_probe->setup_metrics();
if (unlikely(config::shard_local_cfg()
.log_disable_housekeeping_for_tests.value())) {
co_return;
Expand All @@ -195,6 +200,8 @@ ss::future<> log_manager::stop() {
co_await _compaction_hash_key_map->initialize(0);
_compaction_hash_key_map.reset();
}

_probe->clear_metrics();
}

/**
Expand Down Expand Up @@ -280,6 +287,7 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) {
_abort_source,
std::move(ntp_sanitizer_cfg),
_compaction_hash_key_map.get()));
_probe->housekeeping_log_processed();

// bail out of compaction early in order to get back to gc
if (_gc_triggered) {
Expand Down Expand Up @@ -359,6 +367,7 @@ ss::future<> log_manager::housekeeping_loop() {
// it is expected that callers set the flag whenever they want the
// next round of housekeeping to priortize gc.
_gc_triggered = false;
_probe->urgent_gc_run();

/*
* build a schedule of partitions to gc ordered by amount of
Expand Down Expand Up @@ -576,7 +585,7 @@ ss::future<ss::shared_ptr<log>> log_manager::do_manage(
auto [it, success] = _logs.emplace(
l->config().ntp(), std::make_unique<log_housekeeping_meta>(l));
_logs_list.push_back(*it->second);
_resources.update_partition_count(_logs.size());
update_log_count();
vassert(success, "Could not keep track of:{} - concurrency issue", l);
co_return l;
}
Expand All @@ -596,7 +605,7 @@ ss::future<> log_manager::remove(model::ntp ntp) {
vlog(stlog.info, "Asked to remove: {}", ntp);
auto g = _gate.hold();
auto handle = _logs.extract(ntp);
_resources.update_partition_count(_logs.size());
update_log_count();
if (handle.empty()) {
co_return;
}
Expand Down Expand Up @@ -879,4 +888,11 @@ gc_config log_manager::default_gc_config() const {
return {collection_threshold, _config.retention_bytes()};
}

void log_manager::update_log_count() {
auto count = _logs.size();

_resources.update_partition_count(count);
_probe->set_log_count(count);
}

} // namespace storage
9 changes: 9 additions & 0 deletions src/v/storage/log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@

namespace storage {

class log_manager_probe;

namespace testing_details {
class log_manager_accessor;
};
Expand Down Expand Up @@ -170,6 +172,7 @@ class log_manager {
kvstore& kvstore,
storage_resources&,
ss::sharded<features::feature_table>&) noexcept;
~log_manager();

ss::future<ss::shared_ptr<log>> manage(
ntp_config,
Expand Down Expand Up @@ -280,6 +283,8 @@ class log_manager {

ss::future<> housekeeping_scan(model::timestamp);

void update_log_count();

log_config _config;
kvstore& _kvstore;
storage_resources& _resources;
Expand All @@ -293,6 +298,10 @@ class log_manager {
// Hash key-map to use across multiple compactions to reuse reserved memory
// rather than reallocating repeatedly.
std::unique_ptr<hash_key_offset_map> _compaction_hash_key_map;

// Metrics.
std::unique_ptr<log_manager_probe> _probe;

ss::gate _gate;
ss::abort_source _abort_source;

Expand Down
50 changes: 50 additions & 0 deletions src/v/storage/log_manager_probe.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "storage/log_manager_probe.h"

#include "config/configuration.h"
#include "metrics/prometheus_sanitize.h"

#include <seastar/core/metrics.hh>

namespace storage {

void log_manager_probe::setup_metrics() {
if (config::shard_local_cfg().disable_metrics()) {
return;
}

namespace sm = ss::metrics;

auto group_name = prometheus_sanitize::metrics_name("storage:manager");

_metrics.add_group(
group_name,
{
sm::make_gauge(
"logs",
[this] { return _log_count; },
sm::description("Number of logs managed")),
sm::make_counter(
"urgent_gc_runs",
[this] { return _urgent_gc_runs; },
sm::description("Number of urgent GC runs")),
sm::make_counter(
"housekeeping_log_processed",
[this] { return _housekeeping_log_processed; },
sm::description("Number of logs processed by housekeeping")),
},
{},
{});
}

void log_manager_probe::clear_metrics() { _metrics.clear(); }

} // namespace storage
45 changes: 45 additions & 0 deletions src/v/storage/log_manager_probe.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#pragma once

#include "metrics/metrics.h"

#include <cstdint>

namespace storage {

/// Log manager per-shard storage probe.
class log_manager_probe {
public:
log_manager_probe() = default;
log_manager_probe(const log_manager_probe&) = delete;
log_manager_probe& operator=(const log_manager_probe&) = delete;
log_manager_probe(log_manager_probe&&) = delete;
log_manager_probe& operator=(log_manager_probe&&) = delete;
~log_manager_probe() = default;

public:
void setup_metrics();
void clear_metrics();

public:
void set_log_count(uint32_t log_count) { _log_count = log_count; }
void housekeeping_log_processed() { ++_housekeeping_log_processed; }
void urgent_gc_run() { ++_urgent_gc_runs; }

private:
uint32_t _log_count = 0;
uint64_t _urgent_gc_runs = 0;
uint64_t _housekeeping_log_processed = 0;

metrics::internal_metric_groups _metrics;
};

}; // namespace storage
23 changes: 22 additions & 1 deletion tests/rptest/tests/full_disk_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from rptest.services.admin import Admin
from rptest.services.cluster import cluster
from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierSeqConsumer
from rptest.services.redpanda import LoggingConfig, RedpandaService, SISettings
from rptest.services.redpanda import LoggingConfig, MetricsEndpoint, RedpandaService, SISettings
from rptest.tests.end_to_end import EndToEndTest
from rptest.tests.redpanda_test import RedpandaTest
from rptest.util import produce_total_bytes, search_logs_with_timeout
Expand Down Expand Up @@ -315,6 +315,17 @@ def observed_data_size(pred):
timeout_sec=30,
backoff_sec=2)

assert self.redpanda.metric_sum(
metric_name=
"vectorized_storage_manager_housekeeping_log_processed_total",
metrics_endpoint=MetricsEndpoint.METRICS
) == 0, "Housekeeping should not have run yet"

assert self.redpanda.metric_sum(
metric_name="vectorized_storage_manager_urgent_gc_runs_total",
metrics_endpoint=MetricsEndpoint.METRICS
) == 0, "GC should not have run yet"

# now trigger the disk space alert on the same node. unlike the 30
# second delay above, we should almost immediately observe the data
# be reclaimed from disk.
Expand All @@ -331,6 +342,16 @@ def observed_data_size(pred):
timeout_sec=10,
backoff_sec=2)

assert self.redpanda.metric_sum(
metric_name=
"vectorized_storage_manager_housekeeping_log_processed_total",
metrics_endpoint=MetricsEndpoint.METRICS
) > 0, "Housekeeping should have run"

assert self.redpanda.metric_sum(
metric_name="vectorized_storage_manager_urgent_gc_runs_total",
metrics_endpoint=MetricsEndpoint.METRICS) > 0, "GC should have run"


class LocalDiskReportTimeTest(RedpandaTest):
topics = (TopicSpec(segment_bytes=2**20,
Expand Down

0 comments on commit 3b09748

Please sign in to comment.