Skip to content

Commit

Permalink
schema_registry: Add cache metrics
Browse files Browse the repository at this point in the history
Add `schema_registry_cache_subject_version_count`

Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed Nov 19, 2024
1 parent 7434dd8 commit a8b6f46
Showing 1 changed file with 56 additions and 5 deletions.
61 changes: 56 additions & 5 deletions src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
#include "pandaproxy/schema_registry/errors.h"
#include "pandaproxy/schema_registry/types.h"

#include <seastar/core/metrics.hh>

#include <absl/algorithm/container.h>
#include <absl/container/btree_map.h>
#include <absl/container/btree_set.h>
#include <absl/container/node_hash_map.h>

#include <optional>
#include <ranges>

namespace pandaproxy::schema_registry {
Expand Down Expand Up @@ -551,7 +554,7 @@ class store {
result<bool>
set_mode(seq_marker marker, const subject& sub, mode m, force f) {
BOOST_OUTCOME_TRYX(check_mode_mutability(f));
auto& sub_entry = _subjects[sub];
auto& sub_entry = get_or_create_subject_entry(sub);
sub_entry.written_at.push_back(marker);
return std::exchange(sub_entry.mode, m) != m;
}
Expand Down Expand Up @@ -597,7 +600,7 @@ class store {
seq_marker marker,
const subject& sub,
compatibility_level compatibility) {
auto& sub_entry = _subjects[sub];
auto& sub_entry = get_or_create_subject_entry(sub);
sub_entry.written_at.push_back(marker);
return std::exchange(sub_entry.compatibility, compatibility)
!= compatibility;
Expand Down Expand Up @@ -645,7 +648,7 @@ class store {
bool inserted;
};
insert_subject_result insert_subject(subject sub, schema_id id) {
auto& subject_entry = _subjects[std::move(sub)];
auto& subject_entry = get_or_create_subject_entry(std::move(sub));
subject_entry.deleted = is_deleted::no;
auto& versions = subject_entry.versions;
const auto v_it = std::find_if(
Expand All @@ -669,7 +672,7 @@ class store {
schema_version version,
schema_id id,
is_deleted deleted) {
auto& subject_entry = _subjects[std::move(sub)];
auto& subject_entry = get_or_create_subject_entry(std::move(sub));
auto& versions = subject_entry.versions;
subject_entry.written_at.push_back(marker);

Expand Down Expand Up @@ -782,17 +785,65 @@ class store {
canonical_schema_definition definition;
};

struct subject_entry {
class subject_entry {
public:
explicit subject_entry(const subject& sub) { setup_metrics(sub); }
std::optional<compatibility_level> compatibility;
std::optional<mode> mode;
std::vector<subject_version_entry> versions;
is_deleted deleted{false};

std::vector<seq_marker> written_at;

private:
metrics::internal_metric_groups _metrics;
metrics::public_metric_groups _public_metrics;

void setup_metrics(const subject& sub) {
namespace sm = ss::metrics;
auto group_name = prometheus_sanitize::metrics_name(
"schema_registry_cache");
const auto make_subject_version_count = [this,
&sub](is_deleted deleted) {
return sm::make_gauge(
"subject_version_count",
[this, deleted] {
return std::ranges::count_if(
versions, [deleted](const subject_version_entry& v) {
return v.deleted == deleted;
});
},
sm::description("The number of versions in the subject"),
{
sm::label{"subject"}(sub),
sm::label{"deleted"}(deleted),
});
};
if (!config::shard_local_cfg().disable_metrics()) {
_metrics.add_group(
group_name,
{make_subject_version_count(is_deleted::no),
make_subject_version_count(is_deleted::yes)},
{},
{sm::shard_label});
}
if (!config::shard_local_cfg().disable_public_metrics()) {
_public_metrics.add_group(
group_name,
{make_subject_version_count(is_deleted::no)
.aggregate({sm::shard_label}),
make_subject_version_count(is_deleted::yes)
.aggregate({sm::shard_label})});
}
}
};
using schema_map = absl::btree_map<schema_id, schema_entry>;
using subject_map = absl::node_hash_map<subject, subject_entry>;

subject_entry& get_or_create_subject_entry(subject sub) {
return _subjects.try_emplace(sub, sub).first->second;
}

result<subject_map::iterator>
get_subject_iter(const subject& sub, include_deleted inc_del) {
const store* const_this = this;
Expand Down

0 comments on commit a8b6f46

Please sign in to comment.