Skip to content
This repository has been archived by the owner on Jun 28, 2022. It is now read-only.

Paullgdc/switch ddprof http client #32

Merged
merged 15 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
430 changes: 161 additions & 269 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion ddprof-exporter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,17 @@ http = "0.2"
lazy_static = "1.4"
libc = "0.2"
regex = "1.5"
reqwest = { version = "0.11", features = ["blocking", "multipart", "rustls-tls"], default-features = false }
hyper = { version = "0.14", features = ["http1", "client", "tcp", "stream"], default-features = false }
tokio = { version = "1.8", features = ["rt"]}
percent-encoding = "2.1"
futures-core = { version = "0.3.0", default-features = false }
futures-util = { version = "0.3.0", default-features = false }
mime_guess = { version = "2.0", default-features = false }
http-body = "0.4"
pin-project-lite = "0.2.0"
hyper-rustls = { version = "0.23", default-features = false, features = ["native-tokio", "http1", "tls12"] }
hex = "0.4"
hyper-multipart-rfc7578 = { git = "https://github.com/paullegranddc/rust-multipart-rfc7578.git", rev = "8dcedc266e50876c04c91d24390fe9ac44f10b96" }

[dev-dependencies]
maplit = "1.0"
204 changes: 204 additions & 0 deletions ddprof-exporter/src/connector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
use std::error::Error;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

// Tokio doesn't handle unix sockets on windows
#[cfg(unix)]
pub(crate) mod uds {
use pin_project_lite::pin_project;
use std::error::Error;
use std::ffi::OsString;
use std::os::unix::ffi::{OsStrExt, OsStringExt};
use std::path::{Path, PathBuf};

/// Creates a new Uri, with the `unix` scheme, and the path to the socket
/// encoded as a hex string, to prevent special characters in the url authority
pub fn socket_path_to_uri(path: &Path) -> Result<hyper::Uri, Box<dyn Error>> {
let path = hex::encode(path.as_os_str().as_bytes());
Ok(hyper::Uri::builder()
.scheme("unix")
.authority(path)
.path_and_query("")
.build()?)
}

pub fn socket_path_from_uri(
uri: &hyper::Uri,
) -> Result<PathBuf, Box<dyn Error + Sync + Send + 'static>> {
if uri.scheme_str() != Some("unix") {
return Err(crate::errors::Error::InvalidUrl.into());
}
let path = hex::decode(
uri.authority()
.ok_or(crate::errors::Error::InvalidUrl)?
.as_str(),
)
.map_err(|_| crate::errors::Error::InvalidUrl)?;
Ok(PathBuf::from(OsString::from_vec(path)))
}

#[test]
fn test_encode_unix_socket_path_absolute() {
let expected_path = "/path/to/a/socket.sock".as_ref();
let uri = socket_path_to_uri(expected_path).unwrap();
assert_eq!(uri.scheme_str(), Some("unix"));

let actual_path = socket_path_from_uri(&uri).unwrap();
assert_eq!(actual_path.as_path(), Path::new(expected_path))
}

#[test]
fn test_encode_unix_socket_relative_path() {
let expected_path = "relative/path/to/a/socket.sock".as_ref();
let uri = socket_path_to_uri(expected_path).unwrap();
let actual_path = socket_path_from_uri(&uri).unwrap();
assert_eq!(actual_path.as_path(), Path::new(expected_path));

let expected_path = "./relative/path/to/a/socket.sock".as_ref();
let uri = socket_path_to_uri(expected_path).unwrap();
let actual_path = socket_path_from_uri(&uri).unwrap();
assert_eq!(actual_path.as_path(), Path::new(expected_path));
}

pin_project! {
#[project = ConnStreamProj]
pub(crate) enum ConnStream {
Tcp{ #[pin] transport: hyper_rustls::MaybeHttpsStream<tokio::net::TcpStream> },
Udp{ #[pin] transport: tokio::net::UnixStream },
}
}
}

#[cfg(unix)]
use uds::{ConnStream, ConnStreamProj};

#[cfg(not(unix))]
pin_project_lite::pin_project! {
#[project = ConnStreamProj]
pub(crate) enum ConnStream {
Tcp{ #[pin] transport: hyper_rustls::MaybeHttpsStream<tokio::net::TcpStream> },
}
}

#[derive(Clone)]
pub(crate) struct Connector {
tcp: hyper_rustls::HttpsConnector<hyper::client::HttpConnector>,
}

impl Connector {
pub(crate) fn new() -> Self {
Self {
tcp: hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.https_or_http()
.enable_http1()
.build(),
}
}
}

impl tokio::io::AsyncRead for ConnStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
match self.project() {
ConnStreamProj::Tcp { transport } => transport.poll_read(cx, buf),
#[cfg(unix)]
ConnStreamProj::Udp { transport } => transport.poll_read(cx, buf),
}
}
}

impl hyper::client::connect::Connection for ConnStream {
fn connected(&self) -> hyper::client::connect::Connected {
match self {
Self::Tcp { transport } => transport.connected(),
#[cfg(unix)]
Self::Udp { transport: _ } => hyper::client::connect::Connected::new(),
}
}
}

impl tokio::io::AsyncWrite for ConnStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
match self.project() {
ConnStreamProj::Tcp { transport } => transport.poll_write(cx, buf),
#[cfg(unix)]
ConnStreamProj::Udp { transport } => transport.poll_write(cx, buf),
}
}

fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
match self.project() {
ConnStreamProj::Tcp { transport } => transport.poll_shutdown(cx),
#[cfg(unix)]
ConnStreamProj::Udp { transport } => transport.poll_shutdown(cx),
}
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
match self.project() {
ConnStreamProj::Tcp { transport } => transport.poll_flush(cx),
#[cfg(unix)]
ConnStreamProj::Udp { transport } => transport.poll_flush(cx),
}
}
}

impl hyper::service::Service<hyper::Uri> for Connector {
type Response = ConnStream;
type Error = Box<dyn Error + Sync + Send>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn call(&mut self, uri: hyper::Uri) -> Self::Future {
match uri.scheme_str() {
Some("unix") => Box::pin(async move {
#[cfg(unix)]
{
let path = uds::socket_path_from_uri(&uri)?;
Ok(ConnStream::Udp {
transport: tokio::net::UnixStream::connect(path).await?,
})
}
#[cfg(not(unix))]
{
Err(crate::errors::Error::UnixSockeUnsuported.into())
}
}),
_ => {
let fut = self.tcp.call(uri);
Box::pin(async {
Ok(ConnStream::Tcp {
transport: fut.await?,
})
})
}
}
}

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.tcp.poll_ready(cx).map_err(|e| e.into())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
/// Verify that the Connector type implements the correct bound Connect + Clone
/// to be able to use the hyper::Client
fn test_hyper_client_from_connector() {
let _: hyper::Client<Connector> = hyper::Client::builder().build(Connector::new());
}
}
22 changes: 22 additions & 0 deletions ddprof-exporter/src/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::error;
use std::fmt;

#[derive(Clone, Debug)]
#[allow(dead_code)]
pub(crate) enum Error {
InvalidUrl,
OperationTimedOut,
UnixSockeUnsuported
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
Self::InvalidUrl => "invalid url",
Self::OperationTimedOut => "operation timed out",
Self::UnixSockeUnsuported => "unix sockets unsuported on windows"
})
}
}

impl error::Error for Error {}
Loading