Skip to content

Commit

Permalink
instr(metrics): Instrument metric delays
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Dec 18, 2024
1 parent 1dc9941 commit 535d81e
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 3 deletions.
1 change: 1 addition & 0 deletions relay-metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,5 @@ mod view;
pub use bucket::*;
pub use finite::*;
pub use protocol::*;
pub use utils::ByNamespace;
pub use view::*;
35 changes: 35 additions & 0 deletions relay-metrics/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,25 @@ pub fn tags_cost(tags: &BTreeMap<String, String>) -> usize {
tags.iter().map(|(k, v)| k.len() + v.len()).sum()
}

/// Utility to store information for each [`MetricNamespace`].
#[derive(Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ByNamespace<T> {
/// Value for the [`MetricNamespace::Sessions`] namespace.
pub sessions: T,
/// Value for the [`MetricNamespace::Transactions`] namespace.
pub transactions: T,
/// Value for the [`MetricNamespace::Spans`] namespace.
pub spans: T,
/// Value for the [`MetricNamespace::Custom`] namespace.
pub custom: T,
/// Value for the [`MetricNamespace::Stats`] namespace.
pub stats: T,
/// Value for the [`MetricNamespace::Unsupported`] namespace.
pub unsupported: T,
}

impl<T> ByNamespace<T> {
/// Returns a reference for the value stored for `namespace`.
pub fn get(&self, namespace: MetricNamespace) -> &T {
match namespace {
MetricNamespace::Sessions => &self.sessions,
Expand All @@ -34,6 +42,7 @@ impl<T> ByNamespace<T> {
}
}

/// Returns a mutable reference for the value stored for `namespace`.
pub fn get_mut(&mut self, namespace: MetricNamespace) -> &mut T {
match namespace {
MetricNamespace::Sessions => &mut self.sessions,
Expand All @@ -46,6 +55,32 @@ impl<T> ByNamespace<T> {
}
}

impl<T> IntoIterator for ByNamespace<T> {
type Item = (MetricNamespace, T);
type IntoIter = std::array::IntoIter<(MetricNamespace, T), 6>;

fn into_iter(self) -> Self::IntoIter {
let Self {
sessions,
transactions,
spans,
custom,
stats,
unsupported,
} = self;

[
(MetricNamespace::Sessions, sessions),
(MetricNamespace::Transactions, transactions),
(MetricNamespace::Spans, spans),
(MetricNamespace::Custom, custom),
(MetricNamespace::Stats, stats),
(MetricNamespace::Unsupported, unsupported),
]
.into_iter()
}
}

impl<T: fmt::Debug + Default + PartialEq> fmt::Debug for ByNamespace<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// A more compact representation. Mainly for snapshot testing.
Expand Down
32 changes: 29 additions & 3 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use relay_event_schema::protocol::{EventId, VALID_PLATFORMS};

use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message};
use relay_metrics::{
Bucket, BucketView, BucketViewValue, BucketsView, FiniteF64, GaugeValue, MetricName,
MetricNamespace, SetView,
Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, FiniteF64, GaugeValue,
MetricName, MetricNamespace, SetView,
};
use relay_quotas::Scoping;
use relay_statsd::metric;
Expand All @@ -37,7 +37,7 @@ use crate::service::ServiceError;
use crate::services::global_config::GlobalConfigHandle;
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::processor::Processed;
use crate::statsd::{RelayCounters, RelayTimers};
use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
use crate::utils::{FormDataIter, ThreadPool, TypedEnvelope, WorkerGroup};

/// Fallback name used for attachment items without a `filename` header.
Expand Down Expand Up @@ -390,9 +390,20 @@ impl StoreService {
let global_config = self.global_config.current();
let mut encoder = BucketEncoder::new(&global_config);

let now = UnixTimestamp::now();
let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();

for mut bucket in buckets {
let namespace = encoder.prepare(&mut bucket);

if let Some(received_at) = bucket.metadata.received_at {
let delay = now.as_secs().saturating_sub(received_at.as_secs());
let (total, count, max) = delay_stats.get_mut(namespace);
*total += delay;
*count += 1;
*max = (*max).max(delay);
}

// Create a local bucket view to avoid splitting buckets unnecessarily. Since we produce
// each bucket separately, we only need to split buckets that exceed the size, but not
// batches.
Expand Down Expand Up @@ -430,6 +441,21 @@ impl StoreService {
"failed to produce metric buckets: {error}"
);
}

for (namespace, (total, count, max)) in delay_stats {
metric!(
counter(RelayCounters::MetricDelaySum) += total,
namespace = namespace.as_str()
);
metric!(
counter(RelayCounters::MetricDelayCount) += count,
namespace = namespace.as_str()
);
metric!(
gauge(RelayGauges::MetricDelayMax) = max,
namespace = namespace.as_str()
);
}
}

fn create_metric_message<'a>(
Expand Down
32 changes: 32 additions & 0 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ pub enum RelayGauges {
ProjectCacheScheduledFetches,
/// Exposes the amount of currently open and handled connections by the server.
ServerActiveConnections,
/// Maximum delay of a metric bucket in seconds.
///
/// The maximum is measured from initial creation of the bucket in an internal Relay
/// until it is produced to Kafka.
///
/// This metric is tagged with:
/// - `namespace`: the metric namespace.
#[cfg(feature = "processing")]
MetricDelayMax,
}

impl GaugeMetric for RelayGauges {
Expand All @@ -52,6 +61,8 @@ impl GaugeMetric for RelayGauges {
}
RelayGauges::ProjectCacheScheduledFetches => "project_cache.fetches.size",
RelayGauges::ServerActiveConnections => "server.http.connections",
#[cfg(feature = "processing")]
RelayGauges::MetricDelayMax => "metrics.buckets.delay.max",
}
}
}
Expand Down Expand Up @@ -812,6 +823,23 @@ pub enum RelayCounters {
ServerSocketAccept,
/// Incremented every time the server aborts a connection because of an idle timeout.
ServerConnectionIdleTimeout,
/// The total delay of metric buckets in seconds.
///
/// The delay is measured from initial creation of the bucket in an internal Relay
/// until it is produced to Kafka.
///
/// Use [`Self::MetricDelayCount`] to calculate the average delay.
///
/// This metric is tagged with:
/// - `namespace`: the metric namespace.
#[cfg(feature = "processing")]
MetricDelaySum,
/// The amount of buckets counted for the [`Self::MetricDelayTotal`] metric.
///
/// This metric is tagged with:
/// - `namespace`: the metric namespace.
#[cfg(feature = "processing")]
MetricDelayCount,
}

impl CounterMetric for RelayCounters {
Expand Down Expand Up @@ -854,6 +882,10 @@ impl CounterMetric for RelayCounters {
RelayCounters::ReplayExceededSegmentLimit => "replay.segment_limit_exceeded",
RelayCounters::ServerSocketAccept => "server.http.accepted",
RelayCounters::ServerConnectionIdleTimeout => "server.http.idle_timeout",
#[cfg(feature = "processing")]
RelayCounters::MetricDelaySum => "metrics.delay.sum",
#[cfg(feature = "processing")]
RelayCounters::MetricDelayCount => "metrics.delay.count",
}
}
}
Expand Down

0 comments on commit 535d81e

Please sign in to comment.