From 821fcc63df766611614fc846b331c96460e52cd6 Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Mon, 5 Feb 2024 16:44:27 +0100 Subject: [PATCH] Add metrics to stresstest and optimize `cadence` usage (#1360) I added a local UDP sink to fully test metrics submission. It turns out the default `cadence` sink and the UDP submissions it does are *extremely* slow (at least on my machine). Switching to the queueing and buffering sink speeds up the stresstest considerably. ``` Workload 0 (concurrency: 10): 176 operations, 11.733334 ops/s Workload 1 (concurrency: 10): 137 operations, 9.133333 ops/s Workload 2 (concurrency: 10): 190 operations, 12.666667 ops/s Workload 3 (concurrency: 100): 169 operations, 11.266666 ops/s Workload 4 (concurrency: 100): 388 operations, 25.866667 ops/s Workload 5 (concurrency: 50): 563 operations, 37.533333 ops/s Workload 0 (concurrency: 10): 2613 operations, 174.2 ops/s Workload 1 (concurrency: 10): 1978 operations, 131.86667 ops/s Workload 2 (concurrency: 10): 3681 operations, 245.4 ops/s Workload 3 (concurrency: 100): 21564 operations, 1437.6 ops/s Workload 4 (concurrency: 100): 10633 operations, 708.86664 ops/s Workload 5 (concurrency: 50): 20848 operations, 1389.8667 ops/s ``` That is a 10x-100x improvement depending on the stresstest. --- Cargo.lock | 1 + crates/symbolicator-service/src/metrics.rs | 7 +++--- crates/symbolicator-stress/Cargo.toml | 3 +++ crates/symbolicator-stress/src/logging.rs | 24 ++++++++++++++++---- crates/symbolicator-stress/src/main.rs | 17 ++++++++++---- crates/symbolicator-stress/src/stresstest.rs | 2 +- 6 files changed, 41 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 849f964ec..434988939 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4504,6 +4504,7 @@ dependencies = [ "clap", "futures", "humantime", + "jemallocator", "sentry", "serde", "serde_json", diff --git a/crates/symbolicator-service/src/metrics.rs b/crates/symbolicator-service/src/metrics.rs index 0283f5db3..70e651db5 100644 --- a/crates/symbolicator-service/src/metrics.rs +++ b/crates/symbolicator-service/src/metrics.rs @@ -3,7 +3,7 @@ use std::collections::BTreeMap; use std::net::ToSocketAddrs; use std::sync::OnceLock; -use cadence::{StatsdClient, UdpMetricSink}; +use cadence::{BufferedUdpMetricSink, QueuingMetricSink, StatsdClient}; static METRICS_CLIENT: OnceLock = OnceLock::new(); @@ -20,8 +20,9 @@ pub fn configure_statsd(prefix: &str, host: A, tags: BTreeMap< } let socket = std::net::UdpSocket::bind("0.0.0.0:0").unwrap(); socket.set_nonblocking(true).unwrap(); - let sink = UdpMetricSink::from(&addrs[..], socket).unwrap(); - let mut builder = StatsdClient::builder(prefix, sink); + let udp_sink = BufferedUdpMetricSink::from(&addrs[..], socket).unwrap(); + let queuing_sink = QueuingMetricSink::from(udp_sink); + let mut builder = StatsdClient::builder(prefix, queuing_sink); for (key, value) in tags { builder = builder.with_tag(key, value) } diff --git a/crates/symbolicator-stress/Cargo.toml b/crates/symbolicator-stress/Cargo.toml index e1cf12222..72475e14e 100644 --- a/crates/symbolicator-stress/Cargo.toml +++ b/crates/symbolicator-stress/Cargo.toml @@ -21,3 +21,6 @@ symbolicator-test = { path = "../symbolicator-test" } tempfile = "3.2.0" tokio = { version = "1.24.2", features = ["rt-multi-thread", "macros", "time", "sync"] } tracing-subscriber = "0.3.17" + +[target.'cfg(not(target_env = "msvc"))'.dependencies] +jemallocator = { version = "0.5", features = ["unprefixed_malloc_on_supported_platforms"] } diff --git a/crates/symbolicator-stress/src/logging.rs b/crates/symbolicator-stress/src/logging.rs index 71e240586..2a50a2a1c 100644 --- a/crates/symbolicator-stress/src/logging.rs +++ b/crates/symbolicator-stress/src/logging.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use std::env; use std::future::Future; use std::io::Write; -use std::net::{SocketAddr, TcpListener}; +use std::net::{SocketAddr, TcpListener, UdpSocket}; use std::pin::Pin; use symbolicator_service::metrics; @@ -21,8 +21,8 @@ pub struct Config { #[derive(Default)] pub struct Guard { sentry: Option, - pub sentry_server: Option + Send>>>, - // TODO: return the ports / futures of the http / upd servers to use + pub http_sink: Option + Send>>>, + pub udp_sink: Option + Send>>>, } pub fn init(config: Config) -> Guard { @@ -38,7 +38,7 @@ pub fn init(config: Config) -> Guard { listener.set_nonblocking(true).unwrap(); let socket = listener.local_addr().unwrap(); - guard.sentry_server = Some(Box::pin(async move { + guard.http_sink = Some(Box::pin(async move { async fn ok() -> &'static str { "OK" } @@ -86,7 +86,21 @@ pub fn init(config: Config) -> Guard { } if config.metrics { - let host = "TODO"; // create a *real* noop udp server to send metrics to + let addr = SocketAddr::from(([127, 0, 0, 1], 0)); + let listener = UdpSocket::bind(addr).unwrap(); + listener.set_nonblocking(true).unwrap(); + let socket = listener.local_addr().unwrap(); + + guard.udp_sink = Some(Box::pin(async move { + let listener = tokio::net::UdpSocket::from_std(listener).unwrap(); + let mut buf = Vec::with_capacity(1024); + loop { + buf.clear(); + let _len = listener.recv_buf(&mut buf).await.unwrap(); + } + })); + + let host = format!("127.0.0.1:{}", socket.port()); // have some default tags, just to be closer to the real world config let mut tags = BTreeMap::new(); diff --git a/crates/symbolicator-stress/src/main.rs b/crates/symbolicator-stress/src/main.rs index 3bafdca9e..6280a9cfb 100644 --- a/crates/symbolicator-stress/src/main.rs +++ b/crates/symbolicator-stress/src/main.rs @@ -14,6 +14,13 @@ mod workloads; use stresstest::perform_stresstest; use workloads::WorkloadsConfig; +#[cfg(not(target_env = "msvc"))] +use jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + /// Command line interface parser. #[derive(Parser)] struct Cli { @@ -47,8 +54,7 @@ fn main() -> Result<()> { backtraces: true, tracing: true, sentry: true, - // TODO: init metrics - ..Default::default() + metrics: true, }); let megs = 1024 * 1024; @@ -57,8 +63,11 @@ fn main() -> Result<()> { .thread_stack_size(8 * megs) .build()?; - if let Some(sentry_server) = logging_guard.sentry_server.take() { - runtime.spawn(sentry_server); + if let Some(http_sink) = logging_guard.http_sink.take() { + runtime.spawn(http_sink); + } + if let Some(udp) = logging_guard.udp_sink.take() { + runtime.spawn(udp); } runtime.block_on(perform_stresstest(service_config, workloads, cli.duration))?; diff --git a/crates/symbolicator-stress/src/stresstest.rs b/crates/symbolicator-stress/src/stresstest.rs index ea4e35083..d04189042 100644 --- a/crates/symbolicator-stress/src/stresstest.rs +++ b/crates/symbolicator-stress/src/stresstest.rs @@ -126,7 +126,7 @@ pub async fn perform_stresstest( let (concurrency, ops) = task.unwrap(); let ops_ps = ops as f32 / duration.as_secs() as f32; - println!("Workload {i} (concurrency: {concurrency}): {ops} operations, {ops_ps} ops/s"); + println!("Workload {i} (concurrency: {concurrency}): {ops} operations, {ops_ps:.2} ops/s"); } Ok(())