From 580f0a632f10ca2a3ca3d890a76447c242a5ea01 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 23 Oct 2023 10:53:37 -0400 Subject: [PATCH] adds metrics to repair QUIC endpoint --- core/src/repair/quic_endpoint.rs | 332 +++++++++++++++++++++++++++++-- 1 file changed, 311 insertions(+), 21 deletions(-) diff --git a/core/src/repair/quic_endpoint.rs b/core/src/repair/quic_endpoint.rs index 031de37f94b5a0..2c5e954a626c74 100644 --- a/core/src/repair/quic_endpoint.rs +++ b/core/src/repair/quic_endpoint.rs @@ -6,8 +6,8 @@ use { log::error, quinn::{ ClientConfig, ConnectError, Connecting, Connection, ConnectionError, Endpoint, - EndpointConfig, ReadToEndError, RecvStream, SendStream, ServerConfig, TokioRuntime, - TransportConfig, VarInt, WriteError, + EndpointConfig, ReadError, ReadToEndError, RecvStream, SendStream, ServerConfig, + TokioRuntime, TransportConfig, VarInt, WriteError, }, rcgen::RcgenError, rustls::{Certificate, PrivateKey}, @@ -24,7 +24,7 @@ use { io::{Cursor, Error as IoError}, net::{IpAddr, SocketAddr, UdpSocket}, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, RwLock, }, time::Duration, @@ -82,16 +82,14 @@ pub struct RemoteRequest { #[derive(Error, Debug)] #[allow(clippy::enum_variant_names)] pub(crate) enum Error { - #[error(transparent)] - BincodeError(#[from] bincode::Error), #[error(transparent)] CertificateError(#[from] RcgenError), + #[error("Channel Send Error")] + ChannelSendError, #[error(transparent)] ConnectError(#[from] ConnectError), #[error(transparent)] ConnectionError(#[from] ConnectionError), - #[error("Channel Send Error")] - ChannelSendError, #[error("Invalid Identity: {0:?}")] InvalidIdentity(SocketAddr), #[error(transparent)] @@ -103,9 +101,15 @@ pub(crate) enum Error { #[error("read_to_end Timeout")] ReadToEndTimeout, #[error(transparent)] - WriteError(#[from] WriteError), - #[error(transparent)] TlsError(#[from] rustls::Error), + #[error(transparent)] + WriteError(#[from] WriteError), +} + +macro_rules! add_metric { + ($metric: expr) => {{ + $metric.fetch_add(1, Ordering::Relaxed); + }}; } #[allow(clippy::type_complexity)] @@ -207,8 +211,11 @@ async fn run_server( router: Arc>>>, cache: Arc>>, ) { + let stats = Arc::::default(); + let report_metrics_task = + tokio::task::spawn(report_metrics_task("repair_quic_server", stats.clone())); while let Some(connecting) = endpoint.accept().await { - tokio::task::spawn(handle_connecting_error( + tokio::task::spawn(handle_connecting_task( endpoint.clone(), connecting, remote_request_sender.clone(), @@ -216,8 +223,10 @@ async fn run_server( prune_cache_pending.clone(), router.clone(), cache.clone(), + stats.clone(), )); } + report_metrics_task.abort(); } async fn run_client( @@ -229,14 +238,17 @@ async fn run_client( router: Arc>>>, cache: Arc>>, ) { + let stats = Arc::::default(); + let report_metrics_task = + tokio::task::spawn(report_metrics_task("repair_quic_client", stats.clone())); while let Some(request) = receiver.recv().await { - let Some(request) = try_route_request(request, &*router.read().await) else { + let Some(request) = try_route_request(request, &*router.read().await, &stats) else { continue; }; let remote_address = request.remote_address; let receiver = { let mut router = router.write().await; - let Some(request) = try_route_request(request, &router) else { + let Some(request) = try_route_request(request, &router, &stats) else { continue; }; let (sender, receiver) = tokio::sync::mpsc::channel(ROUTER_CHANNEL_BUFFER); @@ -253,11 +265,13 @@ async fn run_client( prune_cache_pending.clone(), router.clone(), cache.clone(), + stats.clone(), )); } close_quic_endpoint(&endpoint); // Drop sender channels to unblock threads waiting on the receiving end. router.write().await.clear(); + report_metrics_task.abort(); } // Routes the local request to respective channel. Drops the request if the @@ -266,13 +280,15 @@ async fn run_client( fn try_route_request( request: LocalRequest, router: &HashMap>, + stats: &RepairQuicStats, ) -> Option { match router.get(&request.remote_address) { None => Some(request), Some(sender) => match sender.try_send(request) { Ok(()) => None, Err(TrySendError::Full(request)) => { - error!("TrySendError::Full {}", request.remote_address); + debug!("TrySendError::Full {}", request.remote_address); + add_metric!(stats.router_try_send_error_full); None } Err(TrySendError::Closed(request)) => Some(request), @@ -280,7 +296,7 @@ fn try_route_request( } } -async fn handle_connecting_error( +async fn handle_connecting_task( endpoint: Endpoint, connecting: Connecting, remote_request_sender: Sender, @@ -288,6 +304,7 @@ async fn handle_connecting_error( prune_cache_pending: Arc, router: Arc>>>, cache: Arc>>, + stats: Arc, ) { if let Err(err) = handle_connecting( endpoint, @@ -297,10 +314,12 @@ async fn handle_connecting_error( prune_cache_pending, router, cache, + stats.clone(), ) .await { - error!("handle_connecting: {err:?}"); + debug!("handle_connecting: {err:?}"); + record_error(&err, &stats); } } @@ -312,6 +331,7 @@ async fn handle_connecting( prune_cache_pending: Arc, router: Arc>>>, cache: Arc>>, + stats: Arc, ) -> Result<(), Error> { let connection = connecting.await?; let remote_address = connection.remote_address(); @@ -332,6 +352,7 @@ async fn handle_connecting( prune_cache_pending, router, cache, + stats, ) .await; Ok(()) @@ -349,6 +370,7 @@ async fn handle_connection( prune_cache_pending: Arc, router: Arc>>>, cache: Arc>>, + stats: Arc, ) { cache_connection( remote_pubkey, @@ -361,8 +383,10 @@ async fn handle_connection( .await; let send_requests_task = tokio::task::spawn(send_requests_task( endpoint.clone(), + remote_address, connection.clone(), receiver, + stats.clone(), )); let recv_requests_task = tokio::task::spawn(recv_requests_task( endpoint, @@ -370,11 +394,13 @@ async fn handle_connection( remote_pubkey, connection.clone(), remote_request_sender, + stats.clone(), )); match futures::future::try_join(send_requests_task, recv_requests_task).await { Err(err) => error!("handle_connection: {remote_pubkey}, {remote_address}, {err:?}"), - Ok(((), Err(ref err))) => { - error!("recv_requests_task: {remote_pubkey}, {remote_address}, {err:?}"); + Ok(((), Err(err))) => { + debug!("recv_requests_task: {remote_pubkey}, {remote_address}, {err:?}"); + record_error(&err, &stats); } Ok(((), Ok(()))) => (), } @@ -392,6 +418,7 @@ async fn recv_requests_task( remote_pubkey: Pubkey, connection: Connection, remote_request_sender: Sender, + stats: Arc, ) -> Result<(), Error> { loop { let (send_stream, recv_stream) = connection.accept_bi().await?; @@ -402,6 +429,7 @@ async fn recv_requests_task( send_stream, recv_stream, remote_request_sender.clone(), + stats.clone(), )); } } @@ -413,6 +441,7 @@ async fn handle_streams_task( send_stream: SendStream, recv_stream: RecvStream, remote_request_sender: Sender, + stats: Arc, ) { if let Err(err) = handle_streams( &endpoint, @@ -424,7 +453,8 @@ async fn handle_streams_task( ) .await { - error!("handle_stream: {remote_address}, {remote_pubkey}, {err:?}"); + debug!("handle_stream: {remote_address}, {remote_pubkey}, {err:?}"); + record_error(&err, &stats); } } @@ -469,21 +499,32 @@ async fn handle_streams( async fn send_requests_task( endpoint: Endpoint, + remote_address: SocketAddr, connection: Connection, mut receiver: AsyncReceiver, + stats: Arc, ) { while let Some(request) = receiver.recv().await { tokio::task::spawn(send_request_task( endpoint.clone(), + remote_address, connection.clone(), request, + stats.clone(), )); } } -async fn send_request_task(endpoint: Endpoint, connection: Connection, request: LocalRequest) { +async fn send_request_task( + endpoint: Endpoint, + remote_address: SocketAddr, + connection: Connection, + request: LocalRequest, + stats: Arc, +) { if let Err(err) = send_request(endpoint, connection, request).await { - error!("send_request: {err:?}") + debug!("send_request: {remote_address}, {err:?}"); + record_error(&err, &stats); } } @@ -542,6 +583,7 @@ async fn make_connection_task( prune_cache_pending: Arc, router: Arc>>>, cache: Arc>>, + stats: Arc, ) { if let Err(err) = make_connection( endpoint, @@ -552,10 +594,12 @@ async fn make_connection_task( prune_cache_pending, router, cache, + stats.clone(), ) .await { - error!("make_connection: {remote_address}, {err:?}"); + debug!("make_connection: {remote_address}, {err:?}"); + record_error(&err, &stats); } } @@ -568,6 +612,7 @@ async fn make_connection( prune_cache_pending: Arc, router: Arc>>>, cache: Arc>>, + stats: Arc, ) -> Result<(), Error> { let connection = endpoint .connect(remote_address, CONNECT_SERVER_NAME)? @@ -583,6 +628,7 @@ async fn make_connection( prune_cache_pending, router, cache, + stats, ) .await; Ok(()) @@ -698,6 +744,250 @@ impl From> for Error { } } +#[derive(Default)] +struct RepairQuicStats { + connect_error_invalid_remote_address: AtomicU64, + connect_error_other: AtomicU64, + connect_error_too_many_connections: AtomicU64, + connection_error_application_closed: AtomicU64, + connection_error_connection_closed: AtomicU64, + connection_error_locally_closed: AtomicU64, + connection_error_reset: AtomicU64, + connection_error_timed_out: AtomicU64, + connection_error_transport_error: AtomicU64, + connection_error_version_mismatch: AtomicU64, + invalid_identity: AtomicU64, + no_response_received: AtomicU64, + read_to_end_error_connection_lost: AtomicU64, + read_to_end_error_illegal_ordered_read: AtomicU64, + read_to_end_error_reset: AtomicU64, + read_to_end_error_too_long: AtomicU64, + read_to_end_error_unknown_stream: AtomicU64, + read_to_end_error_zero_rtt_rejected: AtomicU64, + read_to_end_timeout: AtomicU64, + router_try_send_error_full: AtomicU64, + write_error_connection_lost: AtomicU64, + write_error_stopped: AtomicU64, + write_error_unknown_stream: AtomicU64, + write_error_zero_rtt_rejected: AtomicU64, +} + +async fn report_metrics_task(name: &'static str, stats: Arc) { + const METRICS_SUBMIT_CADENCE: Duration = Duration::from_secs(2); + loop { + tokio::time::sleep(METRICS_SUBMIT_CADENCE).await; + report_metrics(name, &stats); + } +} + +fn record_error(err: &Error, stats: &RepairQuicStats) { + match err { + Error::CertificateError(_) => (), + Error::ChannelSendError => (), + Error::ConnectError(ConnectError::EndpointStopping) => { + add_metric!(stats.connect_error_other) + } + Error::ConnectError(ConnectError::TooManyConnections) => { + add_metric!(stats.connect_error_too_many_connections) + } + Error::ConnectError(ConnectError::InvalidDnsName(_)) => { + add_metric!(stats.connect_error_other) + } + Error::ConnectError(ConnectError::InvalidRemoteAddress(_)) => { + add_metric!(stats.connect_error_invalid_remote_address) + } + Error::ConnectError(ConnectError::NoDefaultClientConfig) => { + add_metric!(stats.connect_error_other) + } + Error::ConnectError(ConnectError::UnsupportedVersion) => { + add_metric!(stats.connect_error_other) + } + Error::ConnectionError(ConnectionError::VersionMismatch) => { + add_metric!(stats.connection_error_version_mismatch) + } + Error::ConnectionError(ConnectionError::TransportError(_)) => { + add_metric!(stats.connection_error_transport_error) + } + Error::ConnectionError(ConnectionError::ConnectionClosed(_)) => { + add_metric!(stats.connection_error_connection_closed) + } + Error::ConnectionError(ConnectionError::ApplicationClosed(_)) => { + add_metric!(stats.connection_error_application_closed) + } + Error::ConnectionError(ConnectionError::Reset) => add_metric!(stats.connection_error_reset), + Error::ConnectionError(ConnectionError::TimedOut) => { + add_metric!(stats.connection_error_timed_out) + } + Error::ConnectionError(ConnectionError::LocallyClosed) => { + add_metric!(stats.connection_error_locally_closed) + } + Error::InvalidIdentity(_) => add_metric!(stats.invalid_identity), + Error::IoError(_) => (), + Error::NoResponseReceived => add_metric!(stats.no_response_received), + Error::ReadToEndError(ReadToEndError::Read(ReadError::Reset(_))) => { + add_metric!(stats.read_to_end_error_reset) + } + Error::ReadToEndError(ReadToEndError::Read(ReadError::ConnectionLost(_))) => { + add_metric!(stats.read_to_end_error_connection_lost) + } + Error::ReadToEndError(ReadToEndError::Read(ReadError::UnknownStream)) => { + add_metric!(stats.read_to_end_error_unknown_stream) + } + Error::ReadToEndError(ReadToEndError::Read(ReadError::IllegalOrderedRead)) => { + add_metric!(stats.read_to_end_error_illegal_ordered_read) + } + Error::ReadToEndError(ReadToEndError::Read(ReadError::ZeroRttRejected)) => { + add_metric!(stats.read_to_end_error_zero_rtt_rejected) + } + Error::ReadToEndError(ReadToEndError::TooLong) => { + add_metric!(stats.read_to_end_error_too_long) + } + Error::ReadToEndTimeout => add_metric!(stats.read_to_end_timeout), + Error::TlsError(_) => (), + Error::WriteError(WriteError::Stopped(_)) => add_metric!(stats.write_error_stopped), + Error::WriteError(WriteError::ConnectionLost(_)) => { + add_metric!(stats.write_error_connection_lost) + } + Error::WriteError(WriteError::UnknownStream) => { + add_metric!(stats.write_error_unknown_stream) + } + Error::WriteError(WriteError::ZeroRttRejected) => { + add_metric!(stats.write_error_zero_rtt_rejected) + } + } +} + +fn report_metrics(name: &'static str, stats: &RepairQuicStats) { + macro_rules! reset_metric { + ($metric: expr) => { + $metric.swap(0, Ordering::Relaxed) + }; + } + datapoint_info!( + name, + ( + "connect_error_invalid_remote_address", + reset_metric!(stats.connect_error_invalid_remote_address), + i64 + ), + ( + "connect_error_other", + reset_metric!(stats.connect_error_other), + i64 + ), + ( + "connect_error_too_many_connections", + reset_metric!(stats.connect_error_too_many_connections), + i64 + ), + ( + "connection_error_application_closed", + reset_metric!(stats.connection_error_application_closed), + i64 + ), + ( + "connection_error_connection_closed", + reset_metric!(stats.connection_error_connection_closed), + i64 + ), + ( + "connection_error_locally_closed", + reset_metric!(stats.connection_error_locally_closed), + i64 + ), + ( + "connection_error_reset", + reset_metric!(stats.connection_error_reset), + i64 + ), + ( + "connection_error_timed_out", + reset_metric!(stats.connection_error_timed_out), + i64 + ), + ( + "connection_error_transport_error", + reset_metric!(stats.connection_error_transport_error), + i64 + ), + ( + "connection_error_version_mismatch", + reset_metric!(stats.connection_error_version_mismatch), + i64 + ), + ( + "invalid_identity", + reset_metric!(stats.invalid_identity), + i64 + ), + ( + "no_response_received", + reset_metric!(stats.no_response_received), + i64 + ), + ( + "read_to_end_error_connection_lost", + reset_metric!(stats.read_to_end_error_connection_lost), + i64 + ), + ( + "read_to_end_error_illegal_ordered_read", + reset_metric!(stats.read_to_end_error_illegal_ordered_read), + i64 + ), + ( + "read_to_end_error_reset", + reset_metric!(stats.read_to_end_error_reset), + i64 + ), + ( + "read_to_end_error_too_long", + reset_metric!(stats.read_to_end_error_too_long), + i64 + ), + ( + "read_to_end_error_unknown_stream", + reset_metric!(stats.read_to_end_error_unknown_stream), + i64 + ), + ( + "read_to_end_error_zero_rtt_rejected", + reset_metric!(stats.read_to_end_error_zero_rtt_rejected), + i64 + ), + ( + "read_to_end_timeout", + reset_metric!(stats.read_to_end_timeout), + i64 + ), + ( + "router_try_send_error_full", + reset_metric!(stats.router_try_send_error_full), + i64 + ), + ( + "write_error_connection_lost", + reset_metric!(stats.write_error_connection_lost), + i64 + ), + ( + "write_error_stopped", + reset_metric!(stats.write_error_stopped), + i64 + ), + ( + "write_error_unknown_stream", + reset_metric!(stats.write_error_unknown_stream), + i64 + ), + ( + "write_error_zero_rtt_rejected", + reset_metric!(stats.write_error_zero_rtt_rejected), + i64 + ), + ); +} + #[cfg(test)] mod tests { use {