diff --git a/src/v/pandaproxy/probe.cc b/src/v/pandaproxy/probe.cc index 3cbcc2a21db55..a5dd7bb237d3f 100644 --- a/src/v/pandaproxy/probe.cc +++ b/src/v/pandaproxy/probe.cc @@ -50,7 +50,7 @@ void probe::setup_metrics() { return _request_metrics.hist().internal_histogram_logform(); })}, {}, - {sm::shard_label, operation_label}); + {sm::shard_label}); } void probe::setup_public_metrics() { diff --git a/src/v/pandaproxy/schema_registry/store.h b/src/v/pandaproxy/schema_registry/store.h index 53a73a18892a8..e4a3e7c2c4865 100644 --- a/src/v/pandaproxy/schema_registry/store.h +++ b/src/v/pandaproxy/schema_registry/store.h @@ -11,15 +11,23 @@ #pragma once +#include "config/configuration.h" #include "container/fragmented_vector.h" +#include "metrics/metrics.h" +#include "metrics/prometheus_sanitize.h" #include "pandaproxy/schema_registry/errors.h" #include "pandaproxy/schema_registry/types.h" +#include + #include #include #include #include +#include +#include + namespace pandaproxy::schema_registry { ///\brief A mapping of version and schema id for a subject. @@ -58,10 +66,13 @@ class store { public: using schema_id_set = absl::btree_set; - explicit store() = default; + explicit store() + : store(is_mutable::no) {} explicit store(is_mutable mut) - : _mutable(mut) {} + : _mutable(mut) { + setup_metrics(); + } struct insert_result { schema_version version; @@ -543,7 +554,7 @@ class store { result set_mode(seq_marker marker, const subject& sub, mode m, force f) { BOOST_OUTCOME_TRYX(check_mode_mutability(f)); - auto& sub_entry = _subjects[sub]; + auto& sub_entry = get_or_create_subject_entry(sub); sub_entry.written_at.push_back(marker); return std::exchange(sub_entry.mode, m) != m; } @@ -589,7 +600,7 @@ class store { seq_marker marker, const subject& sub, compatibility_level compatibility) { - auto& sub_entry = _subjects[sub]; + auto& sub_entry = get_or_create_subject_entry(sub); sub_entry.written_at.push_back(marker); return std::exchange(sub_entry.compatibility, compatibility) != compatibility; @@ -637,7 +648,7 @@ class store { bool inserted; }; insert_subject_result insert_subject(subject sub, schema_id id) { - auto& subject_entry = _subjects[std::move(sub)]; + auto& subject_entry = get_or_create_subject_entry(std::move(sub)); subject_entry.deleted = is_deleted::no; auto& versions = subject_entry.versions; const auto v_it = std::find_if( @@ -661,7 +672,7 @@ class store { schema_version version, schema_id id, is_deleted deleted) { - auto& subject_entry = _subjects[std::move(sub)]; + auto& subject_entry = get_or_create_subject_entry(std::move(sub)); auto& versions = subject_entry.versions; subject_entry.written_at.push_back(marker); @@ -705,6 +716,67 @@ class store { return outcome::success(); } + void setup_metrics() { + namespace sm = ss::metrics; + const auto make_schema_count = [this]() { + return sm::make_gauge( + "schema_count", + [this] { return _schemas.size(); }, + sm::description("The number of schemas in the store")); + }; + const auto make_subject_count = [this](is_deleted deleted) { + return sm::make_gauge( + "subject_count", + [this, deleted] { + return std::ranges::count_if( + _subjects, [deleted](const auto& entry) { + return entry.second.deleted == deleted; + }); + }, + sm::description("The number of subjects in the store"), + {sm::label{"deleted"}(deleted)}); + }; + const auto make_schema_bytes = [this]() { + return sm::make_gauge( + "schema_memory_bytes", + [this] { + return absl::c_accumulate( + _schemas | std::views::transform([](const auto& s) { + return s.second.definition.raw()().size_bytes(); + }), + size_t{0}); + }, + sm::description("The memory usage of schemas in the store")); + }; + auto group_name = prometheus_sanitize::metrics_name( + "schema_registry_cache"); + const std::vector agg{{sm::shard_label}}; + + if (!config::shard_local_cfg().disable_metrics()) { + _metrics.add_group( + group_name, + { + make_schema_count(), + make_schema_bytes(), + make_subject_count(is_deleted::no), + make_subject_count(is_deleted::yes), + }, + {}, + agg); + } + + if (!config::shard_local_cfg().disable_public_metrics()) { + _public_metrics.add_group( + group_name, + { + make_schema_count().aggregate(agg), + make_schema_bytes().aggregate(agg), + make_subject_count(is_deleted::no).aggregate(agg), + make_subject_count(is_deleted::yes).aggregate(agg), + }); + } + }; + private: struct schema_entry { explicit schema_entry(canonical_schema_definition definition) @@ -713,17 +785,65 @@ class store { canonical_schema_definition definition; }; - struct subject_entry { + class subject_entry { + public: + explicit subject_entry(const subject& sub) { setup_metrics(sub); } std::optional compatibility; std::optional mode; std::vector versions; is_deleted deleted{false}; std::vector written_at; + + private: + metrics::internal_metric_groups _metrics; + metrics::public_metric_groups _public_metrics; + + void setup_metrics(const subject& sub) { + namespace sm = ss::metrics; + auto group_name = prometheus_sanitize::metrics_name( + "schema_registry_cache"); + const auto make_subject_version_count = [this, + &sub](is_deleted deleted) { + return sm::make_gauge( + "subject_version_count", + [this, deleted] { + return std::ranges::count_if( + versions, [deleted](const subject_version_entry& v) { + return v.deleted == deleted; + }); + }, + sm::description("The number of versions in the subject"), + { + sm::label{"subject"}(sub), + sm::label{"deleted"}(deleted), + }); + }; + if (!config::shard_local_cfg().disable_metrics()) { + _metrics.add_group( + group_name, + {make_subject_version_count(is_deleted::no), + make_subject_version_count(is_deleted::yes)}, + {}, + {sm::shard_label}); + } + if (!config::shard_local_cfg().disable_public_metrics()) { + _public_metrics.add_group( + group_name, + {make_subject_version_count(is_deleted::no) + .aggregate({sm::shard_label}), + make_subject_version_count(is_deleted::yes) + .aggregate({sm::shard_label})}); + } + } }; using schema_map = absl::btree_map; using subject_map = absl::node_hash_map; + subject_entry& get_or_create_subject_entry(subject sub) { + return _subjects.try_emplace(sub, sub).first->second; + } + result get_subject_iter(const subject& sub, include_deleted inc_del) { const store* const_this = this; @@ -781,7 +901,9 @@ class store { subject_map _subjects; compatibility_level _compatibility{compatibility_level::backward}; mode _mode{mode::read_write}; - is_mutable _mutable{is_mutable::no}; + is_mutable _mutable; + metrics::internal_metric_groups _metrics; + metrics::public_metric_groups _public_metrics; }; } // namespace pandaproxy::schema_registry