Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Network-level telemetry via sink stats #203

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cadence/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ pub use self::client::{

pub use self::sinks::{
BufferedSpyMetricSink, BufferedUdpMetricSink, MetricSink, NopMetricSink, QueuingMetricSink,
QueuingMetricSinkBuilder, SpyMetricSink, UdpMetricSink,
QueuingMetricSinkBuilder, SinkStats, SpyMetricSink, UdpMetricSink,
};

pub use self::types::{
Expand Down
70 changes: 70 additions & 0 deletions cadence/src/sinks/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,68 @@
// except according to those terms.

use std::io;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

#[derive(Clone, Debug, Default)]
pub struct SinkStats {
pub bytes_sent: u64,
pub packets_sent: u64,
pub bytes_dropped: u64,
pub packets_dropped: u64,
}

#[derive(Debug, Clone, Default)]
pub(crate) struct SocketStats {
bytes_sent: Arc<AtomicU64>,
packets_sent: Arc<AtomicU64>,
bytes_dropped: Arc<AtomicU64>,
packets_dropped: Arc<AtomicU64>,
}

impl SocketStats {
pub(crate) fn incr_bytes_sent(&self, n: u64) {
self.bytes_sent.fetch_add(n, Ordering::Relaxed);
}

pub(crate) fn incr_packets_sent(&self) {
self.packets_sent.fetch_add(1, Ordering::Relaxed);
}

pub(crate) fn incr_bytes_dropped(&self, n: u64) {
self.bytes_dropped.fetch_add(n, Ordering::Relaxed);
}

pub(crate) fn incr_packets_dropped(&self) {
self.packets_dropped.fetch_add(1, Ordering::Relaxed);
}

pub(crate) fn update(&self, res: io::Result<usize>, len: usize) -> io::Result<usize> {
match res {
Ok(written) => {
self.incr_bytes_sent(written as u64);
self.incr_packets_sent();
Ok(written)
}
Err(e) => {
self.incr_bytes_dropped(len as u64);
self.incr_packets_dropped();
Err(e)
}
}
}
}

impl From<&SocketStats> for SinkStats {
fn from(stats: &SocketStats) -> Self {
SinkStats {
bytes_sent: stats.bytes_sent.load(Ordering::Relaxed),
packets_sent: stats.packets_sent.load(Ordering::Relaxed),
bytes_dropped: stats.bytes_dropped.load(Ordering::Relaxed),
packets_dropped: stats.packets_dropped.load(Ordering::Relaxed),
}
}
}

/// Trait for various backends that send Statsd metrics somewhere.
///
Expand Down Expand Up @@ -77,6 +139,14 @@ pub trait MetricSink {
fn flush(&self) -> io::Result<()> {
Ok(())
}

mlowicki marked this conversation as resolved.
Show resolved Hide resolved
/// Return I/O telemetry like bytes / packets sent or dropped.
///
/// Note that not all sinks implement this method and the default implementation
/// returns zeros.
fn stats(&self) -> SinkStats {
SinkStats::default()
}
}

/// Implementation of a `MetricSink` that discards all metrics.
Expand Down
2 changes: 1 addition & 1 deletion cadence/src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod queuing;
mod spy;
mod udp;

pub use crate::sinks::core::{MetricSink, NopMetricSink};
pub use crate::sinks::core::{MetricSink, NopMetricSink, SinkStats};
pub use crate::sinks::queuing::{QueuingMetricSink, QueuingMetricSinkBuilder};
pub use crate::sinks::spy::{BufferedSpyMetricSink, SpyMetricSink};
pub use crate::sinks::udp::{BufferedUdpMetricSink, UdpMetricSink};
Expand Down
6 changes: 5 additions & 1 deletion cadence/src/sinks/queuing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use crate::sinks::core::MetricSink;
use crate::sinks::core::{MetricSink, SinkStats};
use crossbeam_channel::{self, Receiver, Sender, TrySendError};
use std::fmt;
use std::io::{self, ErrorKind};
Expand Down Expand Up @@ -273,6 +273,10 @@ impl MetricSink for QueuingMetricSink {
fn flush(&self) -> Result<(), std::io::Error> {
self.sink.flush()
}

fn stats(&self) -> SinkStats {
self.sink.stats()
}
}

impl Drop for QueuingMetricSink {
Expand Down
32 changes: 25 additions & 7 deletions cadence/src/sinks/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
use std::sync::Mutex;

use crate::io::MultiLineWriter;
use crate::sinks::core::MetricSink;
use crate::sinks::core::{MetricSink, SinkStats, SocketStats};
use crate::types::{ErrorKind, MetricError, MetricResult};

// Default size of the buffer for buffered metric sinks. This
Expand Down Expand Up @@ -52,6 +52,7 @@ fn get_addr<A: ToSocketAddrs>(addr: A) -> MetricResult<SocketAddr> {
pub struct UdpMetricSink {
addr: SocketAddr,
socket: UdpSocket,
stats: SocketStats,
}

impl UdpMetricSink {
Expand Down Expand Up @@ -103,13 +104,19 @@ impl UdpMetricSink {
A: ToSocketAddrs,
{
let addr = get_addr(to_addr)?;
Ok(UdpMetricSink { addr, socket })
let stats = SocketStats::default();
Ok(UdpMetricSink { addr, socket, stats })
}
}

impl MetricSink for UdpMetricSink {
fn emit(&self, metric: &str) -> io::Result<usize> {
self.socket.send_to(metric.as_bytes(), self.addr)
self.stats
.update(self.socket.send_to(metric.as_bytes(), self.addr), metric.len())
}

fn stats(&self) -> SinkStats {
(&self.stats).into()
}
}

Expand All @@ -118,17 +125,18 @@ impl MetricSink for UdpMetricSink {
pub(crate) struct UdpWriteAdapter {
addr: SocketAddr,
socket: UdpSocket,
stats: SocketStats,
}

impl UdpWriteAdapter {
pub(crate) fn new(addr: SocketAddr, socket: UdpSocket) -> UdpWriteAdapter {
UdpWriteAdapter { addr, socket }
pub(crate) fn new(addr: SocketAddr, socket: UdpSocket, stats: SocketStats) -> UdpWriteAdapter {
UdpWriteAdapter { addr, socket, stats }
}
}

impl Write for UdpWriteAdapter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.socket.send_to(buf, self.addr)
self.stats.update(self.socket.send_to(buf, self.addr), buf.len())
}

fn flush(&mut self) -> io::Result<()> {
Expand Down Expand Up @@ -161,6 +169,7 @@ impl Write for UdpWriteAdapter {
#[derive(Debug)]
pub struct BufferedUdpMetricSink {
buffer: Mutex<MultiLineWriter<UdpWriteAdapter>>,
stats: SocketStats,
}

impl BufferedUdpMetricSink {
Expand Down Expand Up @@ -238,8 +247,13 @@ impl BufferedUdpMetricSink {
A: ToSocketAddrs,
{
let addr = get_addr(sink_addr)?;
let stats = SocketStats::default();
Ok(BufferedUdpMetricSink {
buffer: Mutex::new(MultiLineWriter::new(UdpWriteAdapter::new(addr, socket), cap)),
buffer: Mutex::new(MultiLineWriter::new(
UdpWriteAdapter::new(addr, socket, stats.clone()),
cap,
)),
stats,
})
}
}
Expand All @@ -254,6 +268,10 @@ impl MetricSink for BufferedUdpMetricSink {
let mut writer = self.buffer.lock().unwrap();
writer.flush()
}

fn stats(&self) -> SinkStats {
(&self.stats).into()
}
}

#[cfg(test)]
Expand Down
32 changes: 27 additions & 5 deletions cadence/src/sinks/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::path::{Path, PathBuf};
use std::sync::Mutex;

use crate::io::MultiLineWriter;
use crate::sinks::core::MetricSink;
use crate::sinks::core::{MetricSink, SinkStats, SocketStats};

// Default size of the buffer for buffered metric sinks. This
// is a rather conservative value, picked for consistency with
Expand All @@ -41,6 +41,7 @@ const DEFAULT_BUFFER_SIZE: usize = 512;
pub struct UnixMetricSink {
socket: UnixDatagram,
path: PathBuf,
stats: SocketStats,
}

impl UnixMetricSink {
Expand Down Expand Up @@ -77,16 +78,25 @@ impl UnixMetricSink {
where
P: AsRef<Path>,
{
let stats = SocketStats::default();
UnixMetricSink {
path: path.as_ref().to_path_buf(),
socket,
stats,
}
}
}

impl MetricSink for UnixMetricSink {
fn emit(&self, metric: &str) -> io::Result<usize> {
self.socket.send_to(metric.as_bytes(), self.path.as_path())
self.stats.update(
self.socket.send_to(metric.as_bytes(), self.path.as_path()),
metric.len(),
)
}

fn stats(&self) -> SinkStats {
(&self.stats).into()
}
}

Expand All @@ -95,23 +105,25 @@ impl MetricSink for UnixMetricSink {
pub(crate) struct UnixWriteAdapter {
path: PathBuf,
socket: UnixDatagram,
stats: SocketStats,
}

impl UnixWriteAdapter {
fn new<P>(socket: UnixDatagram, path: P) -> UnixWriteAdapter
fn new<P>(socket: UnixDatagram, path: P, stats: SocketStats) -> UnixWriteAdapter
where
P: AsRef<Path>,
{
UnixWriteAdapter {
path: path.as_ref().to_path_buf(),
socket,
stats,
}
}
}

impl Write for UnixWriteAdapter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.socket.send_to(buf, &self.path)
self.stats.update(self.socket.send_to(buf, &self.path), buf.len())
}

fn flush(&mut self) -> io::Result<()> {
Expand Down Expand Up @@ -147,6 +159,7 @@ impl Write for UnixWriteAdapter {
#[derive(Debug)]
pub struct BufferedUnixMetricSink {
buffer: Mutex<MultiLineWriter<UnixWriteAdapter>>,
stats: SocketStats,
}

impl BufferedUnixMetricSink {
Expand Down Expand Up @@ -202,8 +215,13 @@ impl BufferedUnixMetricSink {
where
P: AsRef<Path>,
{
let stats = SocketStats::default();
BufferedUnixMetricSink {
buffer: Mutex::new(MultiLineWriter::new(UnixWriteAdapter::new(socket, path), cap)),
buffer: Mutex::new(MultiLineWriter::new(
UnixWriteAdapter::new(socket, path, stats.clone()),
cap,
)),
stats,
}
}
}
Expand All @@ -218,6 +236,10 @@ impl MetricSink for BufferedUnixMetricSink {
let mut writer = self.buffer.lock().unwrap();
writer.flush()
}

fn stats(&self) -> SinkStats {
(&self.stats).into()
}
}

#[cfg(test)]
Expand Down