From 3054c5a24b05c58999587d0b5cf33a347fd74469 Mon Sep 17 00:00:00 2001 From: Erin Power Date: Mon, 2 Dec 2024 13:46:11 +0100 Subject: [PATCH] perf: Use local histogram for processing time --- src/components/proxy/io_uring_shared.rs | 33 ++++++++++++++++--- src/components/proxy/packet_router.rs | 3 +- .../proxy/packet_router/reference.rs | 10 ++++-- src/metrics.rs | 22 +++++++++++-- 4 files changed, 57 insertions(+), 11 deletions(-) diff --git a/src/components/proxy/io_uring_shared.rs b/src/components/proxy/io_uring_shared.rs index 40c1c8f68..9d2a4c6f4 100644 --- a/src/components/proxy/io_uring_shared.rs +++ b/src/components/proxy/io_uring_shared.rs @@ -24,13 +24,13 @@ use crate::{ components::proxy::{self, PendingSends, PipelineError, SendPacket}, metrics, pool::PoolBuffer, - time::UtcTimestamp, }; use io_uring::{squeue::Entry, types::Fd}; use socket2::SockAddr; use std::{ os::fd::{AsRawFd, FromRawFd}, sync::Arc, + time::Instant, }; /// A simple wrapper around [eventfd](https://man7.org/linux/man-pages/man2/eventfd.2.html) @@ -227,7 +227,8 @@ pub enum PacketProcessorCtx { fn process_packet( ctx: &mut PacketProcessorCtx, packet: RecvPacket, - last_received_at: &mut Option, + last_received_at: &mut Option, + processing_time: &prometheus::local::LocalHistogram, ) { match ctx { PacketProcessorCtx::Router { @@ -237,10 +238,10 @@ fn process_packet( error_acc, destinations, } => { - let received_at = UtcTimestamp::now(); + let received_at = Instant::now(); if let Some(last_received_at) = last_received_at { metrics::packet_jitter(metrics::READ, &metrics::EMPTY) - .set((received_at - *last_received_at).nanos()); + .set((received_at - *last_received_at).as_nanos() as _); } *last_received_at = Some(received_at); @@ -256,6 +257,7 @@ fn process_packet( sessions, error_acc, destinations, + processing_time, ); } PacketProcessorCtx::SessionPool { pool, port, .. } => { @@ -453,6 +455,8 @@ impl IoUringLoop { // Just double buffer the pending writes for simplicity let mut double_pending_sends = Vec::with_capacity(pending_sends.capacity()); + let processing_metrics = metrics::ProcessingMetrics::new(); + // When sending packets, this is the direction used when updating metrics let send_dir = if matches!(ctx, PacketProcessorCtx::Router { .. }) { metrics::WRITE @@ -478,6 +482,8 @@ impl IoUringLoop { // onto the submission queue for the loop to actually function (ie, similar to await on futures) loop_ctx.sync(); + const FLUSH_INTERVAL: std::time::Duration = std::time::Duration::from_secs(15); + let mut time_since_flush = std::time::Duration::default(); let mut last_received_at = None; // The core io uring loop @@ -520,7 +526,24 @@ impl IoUringLoop { } let packet = packet.finalize_recv(ret as usize); - process_packet(&mut ctx, packet, &mut last_received_at); + let old_received_at = last_received_at.clone(); + process_packet( + &mut ctx, + packet, + &mut last_received_at, + &processing_metrics.read_processing_time, + ); + + if let (Some(old_received_at), Some(last_received_at)) = + (&old_received_at, &last_received_at) + { + time_since_flush += *last_received_at - *old_received_at; + + if time_since_flush >= FLUSH_INTERVAL { + time_since_flush = <_>::default(); + processing_metrics.flush(); + } + } loop_ctx.enqueue_recv(buffer_pool.clone().alloc()); } diff --git a/src/components/proxy/packet_router.rs b/src/components/proxy/packet_router.rs index 5d54c99e2..29fc475cb 100644 --- a/src/components/proxy/packet_router.rs +++ b/src/components/proxy/packet_router.rs @@ -57,6 +57,7 @@ impl DownstreamReceiveWorkerConfig { sessions: &Arc, error_acc: &mut super::error::ErrorAccumulator, destinations: &mut Vec, + processing_time: &prometheus::local::LocalHistogram, ) { tracing::trace!( id = worker_id, @@ -65,7 +66,7 @@ impl DownstreamReceiveWorkerConfig { "received packet from downstream" ); - let timer = metrics::processing_time(metrics::READ).start_timer(); + let timer = processing_time.start_timer(); match Self::process_downstream_received_packet(packet, config, sessions, destinations) { Ok(()) => { error_acc.maybe_send(); diff --git a/src/components/proxy/packet_router/reference.rs b/src/components/proxy/packet_router/reference.rs index 694d5eae6..c9981901d 100644 --- a/src/components/proxy/packet_router/reference.rs +++ b/src/components/proxy/packet_router/reference.rs @@ -37,7 +37,7 @@ impl super::DownstreamReceiveWorkerConfig { let (tx, mut rx) = tokio::sync::oneshot::channel(); let worker = uring_spawn!(thread_span, async move { - let mut last_received_at = None; + let mut last_received_at: Option = None; let socket = crate::net::DualStackLocalSocket::new(port) .unwrap() .make_refcnt(); @@ -102,6 +102,7 @@ impl super::DownstreamReceiveWorkerConfig { let mut error_acc = crate::components::proxy::error::ErrorAccumulator::new(error_sender); let mut destinations = Vec::with_capacity(1); + let processing_metrics = crate::metrics::ProcessingMetrics::new(); loop { // Initialize a buffer for the UDP packet. We use the maximum size of a UDP @@ -110,7 +111,7 @@ impl super::DownstreamReceiveWorkerConfig { tokio::select! { received = socket.recv_from(buffer) => { - let received_at = crate::time::UtcTimestamp::now(); + let received_at = std::time::Instant::now(); let (result, buffer) = received; match result { @@ -123,7 +124,7 @@ impl super::DownstreamReceiveWorkerConfig { crate::metrics::READ, &crate::metrics::EMPTY, ) - .set((received_at - last_received_at).nanos()); + .set((received_at - last_received_at).as_nanos() as _); } last_received_at = Some(received_at); @@ -134,7 +135,10 @@ impl super::DownstreamReceiveWorkerConfig { &sessions, &mut error_acc, &mut destinations, + &processing_metrics.read_processing_time, ); + + processing_metrics.flush(); } Err(error) => { tracing::error!(%error, "error receiving packet"); diff --git a/src/metrics.rs b/src/metrics.rs index 75019a8a6..a028ee2f3 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -17,8 +17,8 @@ use crate::net::maxmind_db::MetricsIpNetEntry; use once_cell::sync::Lazy; use prometheus::{ - core::Collector, Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, - IntGaugeVec, Opts, Registry, DEFAULT_BUCKETS, + core::Collector, local::LocalHistogram, Histogram, HistogramOpts, HistogramVec, IntCounter, + IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, DEFAULT_BUCKETS, }; pub use prometheus::Result; @@ -282,6 +282,24 @@ pub trait CollectorExt: Collector + Clone + Sized + 'static { impl CollectorExt for C {} +/// A local instance of all of the metrics related to packet processing. +pub struct ProcessingMetrics { + pub read_processing_time: LocalHistogram, +} + +impl ProcessingMetrics { + pub fn new() -> Self { + Self { + read_processing_time: processing_time(READ).local(), + } + } + + #[inline] + pub fn flush(&self) { + self.read_processing_time.flush(); + } +} + #[cfg(test)] mod test { fn check(num: u64, exp: &str) {