Skip to content

Commit

Permalink
Add metrics to stresstest and optimize cadence usage (#1360)
Browse files Browse the repository at this point in the history
I added a local UDP sink to fully test metrics submission.
It turns out the default `cadence` sink and the UDP submissions it does are *extremely* slow (at least on my machine).

Switching to the queueing and buffering sink speeds up the stresstest considerably.

```
Workload 0 (concurrency: 10): 176 operations, 11.733334 ops/s
Workload 1 (concurrency: 10): 137 operations, 9.133333 ops/s
Workload 2 (concurrency: 10): 190 operations, 12.666667 ops/s
Workload 3 (concurrency: 100): 169 operations, 11.266666 ops/s
Workload 4 (concurrency: 100): 388 operations, 25.866667 ops/s
Workload 5 (concurrency: 50): 563 operations, 37.533333 ops/s

Workload 0 (concurrency: 10): 2613 operations, 174.2 ops/s
Workload 1 (concurrency: 10): 1978 operations, 131.86667 ops/s
Workload 2 (concurrency: 10): 3681 operations, 245.4 ops/s
Workload 3 (concurrency: 100): 21564 operations, 1437.6 ops/s
Workload 4 (concurrency: 100): 10633 operations, 708.86664 ops/s
Workload 5 (concurrency: 50): 20848 operations, 1389.8667 ops/s
```

That is a 10x-100x improvement depending on the stresstest.
  • Loading branch information
Swatinem authored Feb 5, 2024
1 parent 6613a10 commit 821fcc6
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 13 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions crates/symbolicator-service/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::BTreeMap;
use std::net::ToSocketAddrs;
use std::sync::OnceLock;

use cadence::{StatsdClient, UdpMetricSink};
use cadence::{BufferedUdpMetricSink, QueuingMetricSink, StatsdClient};

static METRICS_CLIENT: OnceLock<StatsdClient> = OnceLock::new();

Expand All @@ -20,8 +20,9 @@ pub fn configure_statsd<A: ToSocketAddrs>(prefix: &str, host: A, tags: BTreeMap<
}
let socket = std::net::UdpSocket::bind("0.0.0.0:0").unwrap();
socket.set_nonblocking(true).unwrap();
let sink = UdpMetricSink::from(&addrs[..], socket).unwrap();
let mut builder = StatsdClient::builder(prefix, sink);
let udp_sink = BufferedUdpMetricSink::from(&addrs[..], socket).unwrap();
let queuing_sink = QueuingMetricSink::from(udp_sink);
let mut builder = StatsdClient::builder(prefix, queuing_sink);
for (key, value) in tags {
builder = builder.with_tag(key, value)
}
Expand Down
3 changes: 3 additions & 0 deletions crates/symbolicator-stress/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ symbolicator-test = { path = "../symbolicator-test" }
tempfile = "3.2.0"
tokio = { version = "1.24.2", features = ["rt-multi-thread", "macros", "time", "sync"] }
tracing-subscriber = "0.3.17"

[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = { version = "0.5", features = ["unprefixed_malloc_on_supported_platforms"] }
24 changes: 19 additions & 5 deletions crates/symbolicator-stress/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::BTreeMap;
use std::env;
use std::future::Future;
use std::io::Write;
use std::net::{SocketAddr, TcpListener};
use std::net::{SocketAddr, TcpListener, UdpSocket};
use std::pin::Pin;

use symbolicator_service::metrics;
Expand All @@ -21,8 +21,8 @@ pub struct Config {
#[derive(Default)]
pub struct Guard {
sentry: Option<sentry::ClientInitGuard>,
pub sentry_server: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
// TODO: return the ports / futures of the http / upd servers to use
pub http_sink: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
pub udp_sink: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
}

pub fn init(config: Config) -> Guard {
Expand All @@ -38,7 +38,7 @@ pub fn init(config: Config) -> Guard {
listener.set_nonblocking(true).unwrap();
let socket = listener.local_addr().unwrap();

guard.sentry_server = Some(Box::pin(async move {
guard.http_sink = Some(Box::pin(async move {
async fn ok() -> &'static str {
"OK"
}
Expand Down Expand Up @@ -86,7 +86,21 @@ pub fn init(config: Config) -> Guard {
}

if config.metrics {
let host = "TODO"; // create a *real* noop udp server to send metrics to
let addr = SocketAddr::from(([127, 0, 0, 1], 0));
let listener = UdpSocket::bind(addr).unwrap();
listener.set_nonblocking(true).unwrap();
let socket = listener.local_addr().unwrap();

guard.udp_sink = Some(Box::pin(async move {
let listener = tokio::net::UdpSocket::from_std(listener).unwrap();
let mut buf = Vec::with_capacity(1024);
loop {
buf.clear();
let _len = listener.recv_buf(&mut buf).await.unwrap();
}
}));

let host = format!("127.0.0.1:{}", socket.port());

// have some default tags, just to be closer to the real world config
let mut tags = BTreeMap::new();
Expand Down
17 changes: 13 additions & 4 deletions crates/symbolicator-stress/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ mod workloads;
use stresstest::perform_stresstest;
use workloads::WorkloadsConfig;

#[cfg(not(target_env = "msvc"))]
use jemallocator::Jemalloc;

#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

/// Command line interface parser.
#[derive(Parser)]
struct Cli {
Expand Down Expand Up @@ -47,8 +54,7 @@ fn main() -> Result<()> {
backtraces: true,
tracing: true,
sentry: true,
// TODO: init metrics
..Default::default()
metrics: true,
});

let megs = 1024 * 1024;
Expand All @@ -57,8 +63,11 @@ fn main() -> Result<()> {
.thread_stack_size(8 * megs)
.build()?;

if let Some(sentry_server) = logging_guard.sentry_server.take() {
runtime.spawn(sentry_server);
if let Some(http_sink) = logging_guard.http_sink.take() {
runtime.spawn(http_sink);
}
if let Some(udp) = logging_guard.udp_sink.take() {
runtime.spawn(udp);
}

runtime.block_on(perform_stresstest(service_config, workloads, cli.duration))?;
Expand Down
2 changes: 1 addition & 1 deletion crates/symbolicator-stress/src/stresstest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ pub async fn perform_stresstest(
let (concurrency, ops) = task.unwrap();

let ops_ps = ops as f32 / duration.as_secs() as f32;
println!("Workload {i} (concurrency: {concurrency}): {ops} operations, {ops_ps} ops/s");
println!("Workload {i} (concurrency: {concurrency}): {ops} operations, {ops_ps:.2} ops/s");
}

Ok(())
Expand Down

0 comments on commit 821fcc6

Please sign in to comment.