Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert UDS writers to use UnixDatagram instead of UnixStream #89

Merged
merged 6 commits into from
Sep 26, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ edition = "2018"
[dependencies]
crossbeam-channel = "0.3.8"

[dev-dependencies]
tempdir = "0.3.7"

[lib]
name = "cadence"
path = "src/lib.rs"
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -346,13 +346,13 @@ client.set("users.uniques", 42);
Cadence also supports using UDS with the `UdsMetricSink`:

``` rust
use std::os::unix::net::UnixStream;
use std::os::unix::net::UnixDatagram;
use cadence::prelude::*;
use cadence::{StatsdClient, UdsMetricSink};

let socket = UnixStream::connect("/tmp/sock").unwrap();
let socket = UnixDatagram::unbound();
socket.set_nonblocking(true).unwrap();
let sink = UdsMetricSink::from(socket);
let sink = UdsMetricSink::from(socket, "/tmp/sock");
let client = StatsdClient::from_sink("my.prefix", sink);

client.count("my.counter.thing", 29);
Expand Down
3 changes: 1 addition & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@

use std::fmt;
use std::net::{ToSocketAddrs, UdpSocket};
use std::os::unix::net::UnixStream;
use std::panic::RefUnwindSafe;
use std::sync::Arc;
use std::time::Duration;

use crate::builder::{MetricBuilder, MetricFormatter};
use crate::sinks::{MetricSink, UdpMetricSink, UdsMetricSink};
use crate::sinks::{MetricSink, UdpMetricSink};
use crate::types::{
Counter, ErrorKind, Gauge, Histogram, Meter, Metric, MetricError, MetricResult, Set, Timer,
};
Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,13 +354,13 @@
//!
//!
//! ``` rust,no_run
//! use std::os::unix::net::UnixStream;
//! use std::os::unix::net::UnixDatagram;
//! use cadence::prelude::*;
//! use cadence::{StatsdClient, UdsMetricSink};
//!
//! let socket = UnixStream::connect("/tmp/sock").unwrap();
//! let socket = UnixDatagram::unbound().unwrap();
//! socket.set_nonblocking(true).unwrap();
//! let sink = UdsMetricSink::from(socket);
//! let sink = UdsMetricSink::from(socket, "/tmp/sock");
//! let client = StatsdClient::from_sink("my.prefix", sink);
//!
//! client.count("my.counter.thing", 29);
Expand Down
116 changes: 80 additions & 36 deletions src/sinks/uds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

use std::io;
use std::io::Write;
use std::os::unix::net::UnixStream;
use std::os::unix::net::UnixDatagram;
use std::path::{Path, PathBuf};
use std::sync::Mutex;

use crate::io::MultiLineWriter;
Expand All @@ -31,7 +32,8 @@ const DEFAULT_BUFFER_SIZE: usize = 512;
/// called, in the thread of the caller.
#[derive(Debug)]
pub struct UdsMetricSink {
socket: Mutex<UnixStream>,
socket: UnixDatagram,
path: PathBuf,
}

impl UdsMetricSink {
Expand All @@ -43,40 +45,63 @@ impl UdsMetricSink {
/// # Example
///
/// ```no_run
/// use std::os::unix::net::UnixStream;
/// use std::os::unix::net::UnixDatagram;
/// use cadence::UdsMetricSink;
///
/// let socket = UnixStream::connect("/tmp/sock").unwrap();
/// let sink = UdsMetricSink::from(socket);
/// let socket = UnixDatagram::unbound().unwrap();
/// let sink = UdsMetricSink::from(socket, "/tmp/sock");
/// ```
///
/// To send metrics over a non-blocking socket, simply put the socket
/// in non-blocking mode before creating the UDS metric sink.
///
/// # Non-blocking Example
///
/// Note that putting the UDS socket into non-blocking mode is the
/// default when sink and socket are automatically created with the
/// `StatsdClient::from_uds_path` method.
///
/// ```no_run
/// use std::os::unix::net::UnixStream;
/// use std::os::unix::net::UnixDatagram;
/// use cadence::UdsMetricSink;
///
/// let socket = UnixStream::connect("/tmp/sock").unwrap();
/// let socket = UnixDatagram::unbound().unwrap();
/// socket.set_nonblocking(true).unwrap();
/// let sink = UdsMetricSink::from(socket);
/// let sink = UdsMetricSink::from(socket, "/tmp/sock");
/// ```
pub fn from(socket: UnixStream) -> UdsMetricSink {
pub fn from<P: AsRef<Path>>(socket: UnixDatagram, path: P) -> UdsMetricSink {
UdsMetricSink {
socket: Mutex::new(socket),
socket: socket,
path: path.as_ref().to_path_buf(),
}
}
}

impl MetricSink for UdsMetricSink {
fn emit(&self, metric: &str) -> io::Result<usize> {
self.socket.lock().unwrap().write(metric.as_bytes())
self.socket.send_to(metric.as_bytes(), self.path.as_path())
}
}

/// Adapter for writing to a `UdsSocket` via the `Write` trait
56quarters marked this conversation as resolved.
Show resolved Hide resolved
#[derive(Debug)]
pub(crate) struct UdsWriteAdapter {
socket: UnixDatagram,
path: PathBuf,
}

impl UdsWriteAdapter {
fn new<P: AsRef<Path>>(socket: UnixDatagram, path: P) -> UdsWriteAdapter {
UdsWriteAdapter {
socket: socket,
path: path.as_ref().to_path_buf(),
}
}
}

impl Write for UdsWriteAdapter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.socket.send_to(buf, self.path.as_path())
}

fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}

Expand All @@ -97,7 +122,7 @@ impl MetricSink for UdsMetricSink {
/// directly to the underlying UDS socket, bypassing the buffer.
#[derive(Debug)]
pub struct BufferedUdsMetricSink {
buffer: Mutex<MultiLineWriter<UnixStream>>,
buffer: Mutex<MultiLineWriter<UdsWriteAdapter>>,
}

impl BufferedUdsMetricSink {
Expand All @@ -115,14 +140,14 @@ impl BufferedUdsMetricSink {
/// # Example
///
/// ```no_run
/// use std::os::unix::net::UnixStream;
/// use std::os::unix::net::UnixDatagram;
/// use cadence::BufferedUdsMetricSink;
///
/// let socket = UnixStream::connect("/tmp/sock").unwrap();
/// let sink = BufferedUdsMetricSink::from(socket);
/// let socket = UnixDatagram::unbound().unwrap();
/// let sink = BufferedUdsMetricSink::from(socket, "/tmp/sock");
/// ```
pub fn from(socket: UnixStream) -> BufferedUdsMetricSink {
Self::with_capacity(socket, DEFAULT_BUFFER_SIZE)
pub fn from<P: AsRef<Path>>(socket: UnixDatagram, path: P) -> BufferedUdsMetricSink {
Self::with_capacity(socket, path, DEFAULT_BUFFER_SIZE)
}

/// Construct a new `BufferedUdsMetricSink` instance with a custom
Expand All @@ -141,15 +166,22 @@ impl BufferedUdsMetricSink {
/// # Example
///
/// ```no_run
/// use std::os::unix::net::UnixStream;
/// use std::os::unix::net::UnixDatagram;
/// use cadence::BufferedUdsMetricSink;
///
/// let socket = UnixStream::connect("/tmp/sock").unwrap();
/// let sink = BufferedUdsMetricSink::with_capacity(socket, 1432);
/// let socket = UnixDatagram::unbound().unwrap();
/// let sink = BufferedUdsMetricSink::with_capacity(socket, "/tmp/sock", 1432);
/// ```
pub fn with_capacity(socket: UnixStream, cap: usize) -> BufferedUdsMetricSink {
pub fn with_capacity<P: AsRef<Path>>(
socket: UnixDatagram,
path: P,
cap: usize,
) -> BufferedUdsMetricSink {
BufferedUdsMetricSink {
buffer: Mutex::new(MultiLineWriter::new(cap, socket)),
buffer: Mutex::new(MultiLineWriter::new(
cap,
UdsWriteAdapter::new(socket, path),
)),
}
}
}
Expand All @@ -164,29 +196,41 @@ impl MetricSink for BufferedUdsMetricSink {
#[cfg(test)]
mod tests {
use super::{BufferedUdsMetricSink, MetricSink, UdsMetricSink};
use std::os::unix::net::UnixStream;
use std::os::unix::net::UnixDatagram;
use tempdir::TempDir;

#[test]
fn test_uds_metric_sink() {
let (socket, _recv) = UnixStream::pair().unwrap();
let sink = UdsMetricSink::from(socket);
let socket = UnixDatagram::unbound().unwrap();
let tmp_dir = TempDir::new("testing").unwrap();
let file_path = tmp_dir.path().join("tmp.sock");
// Create a listener.
let _listener = UnixDatagram::bind(&file_path);
let sink = UdsMetricSink::from(socket, file_path);
assert_eq!(7, sink.emit("buz:1|m").unwrap());
}

#[test]
fn test_non_blocking_udp_metric_sink() {
let (socket, _recv) = UnixStream::pair().unwrap();
socket.set_nonblocking(true).unwrap();
let sink = UdsMetricSink::from(socket);
fn test_non_blocking_uds_metric_sink() {
56quarters marked this conversation as resolved.
Show resolved Hide resolved
let socket = UnixDatagram::unbound().unwrap();
let tmp_dir = TempDir::new("testing").unwrap();
let file_path = tmp_dir.path().join("tmp.sock");
// Create a listener.
let _listener = UnixDatagram::bind(&file_path);
let sink = UdsMetricSink::from(socket, file_path);
assert_eq!(7, sink.emit("baz:1|m").unwrap());
}

#[test]
fn test_buffered_udp_metric_sink() {
let (socket, _recv) = UnixStream::pair().unwrap();
fn test_buffered_uds_metric_sink() {
let socket = UnixDatagram::unbound().unwrap();
let tmp_dir = TempDir::new("testing").unwrap();
let file_path = tmp_dir.path().join("tmp.sock");
// Create a listener.
let _listener = UnixDatagram::bind(&file_path);
// Set the capacity of the buffer such that we know it will
// be flushed as a response to the metrics we're writing.
let sink = BufferedUdsMetricSink::with_capacity(socket, 16);
let sink = BufferedUdsMetricSink::with_capacity(socket, file_path, 16);

// Note that we're including an extra byte in the expected
// number written since each metric is followed by a '\n' at
Expand Down