Skip to content

Commit

Permalink
move code around
Browse files Browse the repository at this point in the history
  • Loading branch information
mlowicki committed Mar 19, 2024
1 parent 4273460 commit 4c0ff34
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 40 deletions.
45 changes: 44 additions & 1 deletion cadence/src/sinks/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use std::io;
use std::{
io,
sync::{
atomic::{AtomicU64, Ordering},
Arc
},
};

#[derive(Debug, Default)]
pub struct SinkStats {
Expand All @@ -18,6 +24,43 @@ pub struct SinkStats {
pub packets_dropped: u64,
}

#[derive(Debug, Clone, Default)]
pub(crate) struct WriterStats {
bytes_sent: Arc<AtomicU64>,
packets_sent: Arc<AtomicU64>,
bytes_dropped: Arc<AtomicU64>,
packets_dropped: Arc<AtomicU64>,
}

impl WriterStats {
pub(crate) fn incr_bytes_sent(&self, n: u64) {
self.bytes_sent.fetch_add(n, Ordering::Relaxed);
}

pub(crate) fn incr_packets_sent(&self) {
self.packets_sent.fetch_add(1, Ordering::Relaxed);
}

pub(crate) fn incr_bytes_dropped(&self, n: u64) {
self.bytes_dropped.fetch_add(n, Ordering::Relaxed);
}

pub(crate) fn incr_packets_dropped(&self) {
self.packets_dropped.fetch_add(1, Ordering::Relaxed);
}
}

impl From<&WriterStats> for SinkStats {
fn from(stats: &WriterStats) -> Self {
SinkStats {
bytes_sent: stats.bytes_sent.load(Ordering::Relaxed),
packets_sent: stats.packets_sent.load(Ordering::Relaxed),
bytes_dropped: stats.bytes_dropped.load(Ordering::Relaxed),
packets_dropped: stats.packets_dropped.load(Ordering::Relaxed),
}
}
}

/// Trait for various backends that send Statsd metrics somewhere.
///
/// The metric string will be in the canonical format to be sent to a
Expand Down
45 changes: 6 additions & 39 deletions cadence/src/sinks/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@ use std::io;
use std::io::Write;
use std::os::unix::net::UnixDatagram;
use std::path::{Path, PathBuf};
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc, Mutex,
};
use std::sync::Mutex;

use crate::io::MultiLineWriter;
use crate::sinks::core::{MetricSink, SinkStats};
use crate::sinks::core::{MetricSink, SinkStats, WriterStats};

// Default size of the buffer for buffered metric sinks. This
// is a rather conservative value, picked for consistency with
Expand Down Expand Up @@ -93,36 +90,6 @@ impl MetricSink for UnixMetricSink {
}
}

#[derive(Debug, Clone)]
struct WriterStats {
bytes_sent: Arc<AtomicU64>,
packets_sent: Arc<AtomicU64>,
bytes_dropped: Arc<AtomicU64>,
packets_dropped: Arc<AtomicU64>,
}

impl Default for WriterStats {
fn default() -> Self {
Self {
bytes_sent: Arc::new(AtomicU64::new(0)),
packets_sent: Arc::new(AtomicU64::new(0)),
bytes_dropped: Arc::new(AtomicU64::new(0)),
packets_dropped: Arc::new(AtomicU64::new(0)),
}
}
}

impl From<&WriterStats> for SinkStats {
fn from(stats: &WriterStats) -> Self {
SinkStats {
bytes_sent: stats.bytes_sent.load(Ordering::Relaxed),
packets_sent: stats.packets_sent.load(Ordering::Relaxed),
bytes_dropped: stats.bytes_dropped.load(Ordering::Relaxed),
packets_dropped: stats.packets_dropped.load(Ordering::Relaxed),
}
}
}

/// Adapter for writing to a `UnixDatagram` socket via the `Write` trait
#[derive(Debug)]
pub(crate) struct UnixWriteAdapter {
Expand All @@ -148,13 +115,13 @@ impl Write for UnixWriteAdapter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self.socket.send_to(buf, &self.path) {
Ok(written) => {
self.stats.bytes_sent.fetch_add(written as u64, Ordering::Relaxed);
self.stats.packets_sent.fetch_add(1, Ordering::Relaxed);
self.stats.incr_bytes_sent(written as u64);
self.stats.incr_packets_sent();
Ok(written)
}
Err(e) => {
self.stats.bytes_dropped.fetch_add(buf.len() as u64, Ordering::Relaxed);
self.stats.packets_dropped.fetch_add(1, Ordering::Relaxed);
self.stats.incr_bytes_dropped(buf.len() as u64);
self.stats.incr_packets_dropped();
Err(e)
}
}
Expand Down

0 comments on commit 4c0ff34

Please sign in to comment.