From 12c5d610d9efe5e0985ca112cbeb81490997df4c Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Fri, 25 Feb 2022 15:57:27 +0100 Subject: [PATCH 01/14] * Switch the http client from reqwest to hyper * Take the form multipart implementation from reqwest and adapt it to use with hyper --- Cargo.lock | 455 +------------------ ddprof-exporter/Cargo.toml | 8 +- ddprof-exporter/src/lib.rs | 159 ++++--- ddprof-exporter/src/multipart.rs | 741 +++++++++++++++++++++++++++++++ ddprof-exporter/tests/form.rs | 15 +- ddprof-ffi/Cargo.toml | 2 +- ddprof-ffi/src/exporter.rs | 15 +- 7 files changed, 888 insertions(+), 507 deletions(-) create mode 100644 ddprof-exporter/src/multipart.rs diff --git a/Cargo.lock b/Cargo.lock index 59055bf..87873e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,36 +23,18 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" -[[package]] -name = "base64" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" - [[package]] name = "bitflags" version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" -[[package]] -name = "bumpalo" -version = "3.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f1e260c3a9040a7c19a12468758f4c16f31a81a1fe087482be9570ec864bb6c" - [[package]] name = "bytes" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" -[[package]] -name = "cc" -version = "1.0.71" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79c2681d6594606957bbb8631c4b90a7fcaaa72cdb714743a437b156d6a7eedd" - [[package]] name = "cfg-if" version = "1.0.0" @@ -74,7 +56,7 @@ dependencies = [ [[package]] name = "ddprof" -version = "0.2.0" +version = "0.3.0" dependencies = [ "ddprof-exporter", "ddprof-ffi", @@ -83,34 +65,40 @@ dependencies = [ [[package]] name = "ddprof-exporter" -version = "0.2.0" +version = "0.3.0" dependencies = [ "bytes", "chrono", "futures", + "futures-core", + "futures-util", "http", + "http-body", + "hyper", "lazy_static", "libc", "maplit", + "mime_guess", + "percent-encoding", + "pin-project-lite", "regex", - "reqwest", "tokio", ] [[package]] name = "ddprof-ffi" -version = "0.2.0" +version = "0.3.0" dependencies = [ "chrono", "ddprof-exporter", "ddprof-profiles", + "hyper", "libc", - "reqwest", ] [[package]] name = "ddprof-profiles" -version = "0.2.0" +version = "0.3.0" dependencies = [ "indexmap", "libc", @@ -125,15 +113,6 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" -[[package]] -name = "encoding_rs" -version = "0.8.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a74ea89a0a1b98f6332de42c95baff457ada66d1cb4030f9ff151b2041a1c746" -dependencies = [ - "cfg-if", -] - [[package]] name = "fixedbitset" version = "0.2.0" @@ -146,16 +125,6 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" -[[package]] -name = "form_urlencoded" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191" -dependencies = [ - "matches", - "percent-encoding", -] - [[package]] name = "futures" version = "0.3.17" @@ -261,25 +230,6 @@ dependencies = [ "wasi", ] -[[package]] -name = "h2" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fd819562fcebdac5afc5c113c3ec36f902840b70fd4fc458799c8ce4607ae55" -dependencies = [ - "bytes", - "fnv", - "futures-core", - "futures-sink", - "futures-util", - "http", - "indexmap", - "slab", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "hashbrown" version = "0.11.2" @@ -295,15 +245,6 @@ dependencies = [ "unicode-segmentation", ] -[[package]] -name = "hermit-abi" -version = "0.1.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" -dependencies = [ - "libc", -] - [[package]] name = "http" version = "0.2.5" @@ -348,7 +289,6 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", "http", "http-body", "httparse", @@ -362,32 +302,6 @@ dependencies = [ "want", ] -[[package]] -name = "hyper-rustls" -version = "0.22.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f9f7a97316d44c0af9b0301e65010573a853a9fc97046d7331d7f6bc0fd5a64" -dependencies = [ - "futures-util", - "hyper", - "log", - "rustls", - "tokio", - "tokio-rustls", - "webpki", -] - -[[package]] -name = "idna" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8" -dependencies = [ - "matches", - "unicode-bidi", - "unicode-normalization", -] - [[package]] name = "indexmap" version = "1.7.0" @@ -398,12 +312,6 @@ dependencies = [ "hashbrown", ] -[[package]] -name = "ipnet" -version = "2.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68f2d64f2edebec4ce84ad108148e67e1064789bee435edc5b60ad398714a3a9" - [[package]] name = "itertools" version = "0.10.1" @@ -419,15 +327,6 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" -[[package]] -name = "js-sys" -version = "0.3.55" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cc9ffccd38c451a86bf13657df244e9c3f37493cce8e5e21e940963777acc84" -dependencies = [ - "wasm-bindgen", -] - [[package]] name = "lazy_static" version = "1.4.0" @@ -455,12 +354,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" -[[package]] -name = "matches" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" - [[package]] name = "memchr" version = "2.4.1" @@ -475,9 +368,9 @@ checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" [[package]] name = "mime_guess" -version = "2.0.3" +version = "2.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2684d4c2e97d99848d30b324b00c8fcc7e5c897b7cbb5819b09e7c90e8baf212" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" dependencies = [ "mime", "unicase", @@ -513,9 +406,9 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" [[package]] name = "ntapi" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" +checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f" dependencies = [ "winapi", ] @@ -539,22 +432,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "num_cpus" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" -dependencies = [ - "hermit-abi", - "libc", -] - -[[package]] -name = "once_cell" -version = "1.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" - [[package]] name = "percent-encoding" version = "2.1.0" @@ -745,116 +622,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "reqwest" -version = "0.11.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66d2927ca2f685faf0fc620ac4834690d29e7abb153add10f5812eef20b5e280" -dependencies = [ - "base64", - "bytes", - "encoding_rs", - "futures-core", - "futures-util", - "http", - "http-body", - "hyper", - "hyper-rustls", - "ipnet", - "js-sys", - "lazy_static", - "log", - "mime", - "mime_guess", - "percent-encoding", - "pin-project-lite", - "rustls", - "serde", - "serde_json", - "serde_urlencoded", - "tokio", - "tokio-rustls", - "url", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", - "webpki-roots", - "winreg", -] - -[[package]] -name = "ring" -version = "0.16.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" -dependencies = [ - "cc", - "libc", - "once_cell", - "spin", - "untrusted", - "web-sys", - "winapi", -] - -[[package]] -name = "rustls" -version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" -dependencies = [ - "base64", - "log", - "ring", - "sct", - "webpki", -] - -[[package]] -name = "ryu" -version = "1.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" - -[[package]] -name = "sct" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce" -dependencies = [ - "ring", - "untrusted", -] - -[[package]] -name = "serde" -version = "1.0.130" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f12d06de37cf59146fbdecab66aa99f9fe4f78722e3607577a5375d66bd0c913" - -[[package]] -name = "serde_json" -version = "1.0.68" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f690853975602e1bfe1ccbf50504d67174e3bcf340f23b5ea9992e0587a52d8" -dependencies = [ - "itoa", - "ryu", - "serde", -] - -[[package]] -name = "serde_urlencoded" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edfa57a7f8d9c1d260a549e7224100f6c43d43f9103e06dd8b4095a9b2b43ce9" -dependencies = [ - "form_urlencoded", - "itoa", - "ryu", - "serde", -] - [[package]] name = "slab" version = "0.4.5" @@ -871,12 +638,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "spin" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" - [[package]] name = "syn" version = "1.0.81" @@ -912,21 +673,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "tinyvec" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f83b2a3d4d9091d0abd7eba4dc2710b1718583bd4d8992e2190720ea38f391f7" -dependencies = [ - "tinyvec_macros", -] - -[[package]] -name = "tinyvec_macros" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" - [[package]] name = "tokio" version = "1.13.0" @@ -934,40 +680,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "588b2d10a336da58d877567cd8fb8a14b463e2104910f8132cd054b4b96e29ee" dependencies = [ "autocfg", - "bytes", "libc", - "memchr", "mio", - "num_cpus", "pin-project-lite", "winapi", ] -[[package]] -name = "tokio-rustls" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" -dependencies = [ - "rustls", - "tokio", - "webpki", -] - -[[package]] -name = "tokio-util" -version = "0.6.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0" -dependencies = [ - "bytes", - "futures-core", - "futures-sink", - "log", - "pin-project-lite", - "tokio", -] - [[package]] name = "tower-service" version = "0.3.1" @@ -1009,21 +727,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "unicode-bidi" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a01404663e3db436ed2746d9fefef640d868edae3cceb81c3b8d5732fda678f" - -[[package]] -name = "unicode-normalization" -version = "0.1.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54590932941a9e9266f0832deed84ebe1bf2e4c9e4a3554d393d18f5e854bf9" -dependencies = [ - "tinyvec", -] - [[package]] name = "unicode-segmentation" version = "1.8.0" @@ -1036,24 +739,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" -[[package]] -name = "untrusted" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" - -[[package]] -name = "url" -version = "2.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a507c383b2d33b5fc35d1861e77e6b383d158b2da5e14fe51b83dfedf6fd578c" -dependencies = [ - "form_urlencoded", - "idna", - "matches", - "percent-encoding", -] - [[package]] name = "ux" version = "0.1.3" @@ -1062,9 +747,9 @@ checksum = "88dfeb711b61ce620c0cb6fd9f8e3e678622f0c971da2a63c4b3e25e88ed012f" [[package]] name = "version_check" -version = "0.9.3" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" +checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "want" @@ -1082,101 +767,6 @@ version = "0.10.2+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" -[[package]] -name = "wasm-bindgen" -version = "0.2.78" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce" -dependencies = [ - "cfg-if", - "wasm-bindgen-macro", -] - -[[package]] -name = "wasm-bindgen-backend" -version = "0.2.78" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a317bf8f9fba2476b4b2c85ef4c4af8ff39c3c7f0cdfeed4f82c34a880aa837b" -dependencies = [ - "bumpalo", - "lazy_static", - "log", - "proc-macro2", - "quote", - "syn", - "wasm-bindgen-shared", -] - -[[package]] -name = "wasm-bindgen-futures" -version = "0.4.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e8d7523cb1f2a4c96c1317ca690031b714a51cc14e05f712446691f413f5d39" -dependencies = [ - "cfg-if", - "js-sys", - "wasm-bindgen", - "web-sys", -] - -[[package]] -name = "wasm-bindgen-macro" -version = "0.2.78" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d56146e7c495528bf6587663bea13a8eb588d39b36b679d83972e1a2dbbdacf9" -dependencies = [ - "quote", - "wasm-bindgen-macro-support", -] - -[[package]] -name = "wasm-bindgen-macro-support" -version = "0.2.78" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7803e0eea25835f8abdc585cd3021b3deb11543c6fe226dcd30b228857c5c5ab" -dependencies = [ - "proc-macro2", - "quote", - "syn", - "wasm-bindgen-backend", - "wasm-bindgen-shared", -] - -[[package]] -name = "wasm-bindgen-shared" -version = "0.2.78" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0237232789cf037d5480773fe568aac745bfe2afbc11a863e97901780a6b47cc" - -[[package]] -name = "web-sys" -version = "0.3.55" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38eb105f1c59d9eaa6b5cdc92b859d85b926e82cb2e0945cd0c9259faa6fe9fb" -dependencies = [ - "js-sys", - "wasm-bindgen", -] - -[[package]] -name = "webpki" -version = "0.21.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea" -dependencies = [ - "ring", - "untrusted", -] - -[[package]] -name = "webpki-roots" -version = "0.21.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940" -dependencies = [ - "webpki", -] - [[package]] name = "which" version = "4.2.2" @@ -1209,12 +799,3 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" - -[[package]] -name = "winreg" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69" -dependencies = [ - "winapi", -] diff --git a/ddprof-exporter/Cargo.toml b/ddprof-exporter/Cargo.toml index 1c8af7c..bb13033 100644 --- a/ddprof-exporter/Cargo.toml +++ b/ddprof-exporter/Cargo.toml @@ -18,8 +18,14 @@ http = "0.2" lazy_static = "1.4" libc = "0.2" regex = "1.5" -reqwest = { version = "0.11", features = ["blocking", "multipart", "rustls-tls"], default-features = false } +hyper = { version = "0.14", features = ["http1", "client", "tcp", "stream"], default-features = false } tokio = { version = "1.8", features = ["rt"]} +percent-encoding = "2.1" +futures-core = { version = "0.3.0", default-features = false } +futures-util = { version = "0.3.0", default-features = false } +mime_guess = { version = "2.0", default-features = false } +http-body = "0.4" +pin-project-lite = "0.2.0" [dev-dependencies] maplit = "1.0" diff --git a/ddprof-exporter/src/lib.rs b/ddprof-exporter/src/lib.rs index b5cdcff..e62b181 100644 --- a/ddprof-exporter/src/lib.rs +++ b/ddprof-exporter/src/lib.rs @@ -3,19 +3,23 @@ use std::borrow::Cow; use std::error::Error; +use std::str::FromStr; use bytes::Bytes; -use reqwest::header::HeaderValue; -use reqwest::Url; +use hyper::header::HeaderValue; +use hyper::Uri; use tokio::runtime::Runtime; mod container_id; +mod multipart; const DURATION_ZERO: std::time::Duration = std::time::Duration::from_millis(0); const DATADOG_CONTAINER_ID_HEADER: &str = "Datadog-Container-ID"; +type Client = hyper::Client; + pub struct Exporter { - client: reqwest::Client, + client: Client, runtime: Runtime, } @@ -39,7 +43,7 @@ pub struct FieldsV3 { } pub struct Endpoint { - url: Url, + url: Uri, api_key: Option, } @@ -50,6 +54,50 @@ pub struct ProfileExporterV3 { tags: Vec, } +pub struct Request { + timeout: Option, + req: hyper::Request, +} + +impl From> for Request { + fn from(req: hyper::Request) -> Self { + Self { req, timeout: None } + } +} + +impl Request { + fn with_timeout(mut self, timeout: std::time::Duration) -> Self { + self.timeout = if timeout != DURATION_ZERO { + Some(timeout) + } else { + None + }; + self + } + + pub fn timeout(&self) -> &Option { + &self.timeout + } + + pub fn uri(&self) -> &hyper::Uri { + self.req.uri() + } + + pub fn headers(&self) -> &hyper::HeaderMap { + self.req.headers() + } + + async fn send( + self, + client: &Client, + ) -> Result, Box> { + Ok(match self.timeout { + Some(t) => tokio::time::timeout(t, client.request(self.req)).await?, + None => client.request(self.req).await, + }?) + } +} + pub struct File<'a> { pub name: &'a str, pub bytes: &'a [u8], @@ -60,11 +108,19 @@ impl Endpoint { /// /// # Arguments /// * `base_url` - has protocol, host, and port e.g. http://localhost:8126/ - pub fn agent(base_url: Url) -> Result> { - Ok(Endpoint { - url: base_url.join("/profiling/v1/input")?, - api_key: None, - }) + pub fn agent(base_url: Uri) -> Result> { + let mut parts = base_url.into_parts(); + let p_q = match parts.path_and_query { + None => None, + Some(pq) => { + let path = pq.path(); + let path = path.strip_suffix('/').unwrap_or(path); + Some(format!("{}/profiling/v1/input", path).parse()?) + } + }; + parts.path_and_query = p_q; + let url = Uri::from_parts(parts)?; + Ok(Endpoint { url, api_key: None }) } /// Creates an Endpoint for talking to Datadog intake without using the agent. @@ -77,7 +133,7 @@ impl Endpoint { let intake_url = format!("https://intake.profile.{}/v1/input", site.as_ref()); Ok(Endpoint { - url: Url::parse(intake_url.as_str())?, + url: Uri::from_str(intake_url.as_str())?, api_key: Some(String::from(api_key.as_ref())), }) } @@ -104,19 +160,8 @@ impl ProfileExporterV3 { end: chrono::DateTime, files: &[File], timeout: std::time::Duration, - ) -> reqwest::Result { - let mut builder = self - .exporter - .client - .request(reqwest::Method::POST, self.endpoint.url.clone()) - .header("User-Agent", concat!("DDProf/", env!("CARGO_PKG_VERSION"))) - .header("Connection", "close"); - - if timeout != DURATION_ZERO { - builder = builder.timeout(timeout); - } - - let mut form = reqwest::multipart::Form::new() + ) -> Result> { + let mut form = multipart::Form::new() .text("version", "3") .text("start", start.format("%Y-%m-%dT%H:%M:%SZ").to_string()) .text("end", end.format("%Y-%m-%dT%H:%M:%SZ").to_string()) @@ -126,18 +171,29 @@ impl ProfileExporterV3 { form = form.text("tags[]", format!("{}:{}", tag.name, tag.value)); } - form = files - .iter() - .fold(form, |form, file| -> reqwest::multipart::Form { - let filename = file.name.to_owned(); - let bytes = reqwest::multipart::Part::bytes(file.bytes.to_owned()) - .file_name(filename.clone()) - .mime_str("application/octet-stream") - .expect("mime to be valid"); + form = files.iter().fold(form, |form, file| -> multipart::Form { + let filename = file.name.to_owned(); + let bytes = multipart::Part::bytes(file.bytes.to_owned()) + .file_name(filename.clone()) + .mime_str("application/octet-stream") + .expect("mime to be valid"); - form.part(format!("data[{}]", filename), bytes) - }); + form.part(format!("data[{}]", filename), bytes) + }); + let mut builder = hyper::Request::builder() + .method(http::Method::POST) + .uri(self.endpoint.url.clone()) + .header("User-Agent", concat!("DDProf/", env!("CARGO_PKG_VERSION"))) + .header("Connection", "close") + .header( + "Content-type", + format!("multipart/form-data; boundary={}", form.boundary()).as_str(), + ); + + if let Some(length) = form.compute_length() { + builder = builder.header("Content-Length", length); + } if let Some(api_key) = &self.endpoint.api_key { builder = builder.header( "DD-API-KEY", @@ -149,13 +205,16 @@ impl ProfileExporterV3 { builder = builder.header(DATADOG_CONTAINER_ID_HEADER, container_id); } - builder.multipart(form).build() + Ok( + Request::from(builder.body(hyper::Body::wrap_stream(form.stream()))?) + .with_timeout(timeout), + ) } - pub fn send(&self, request: reqwest::Request) -> reqwest::Result { + pub fn send(&self, request: Request) -> Result, Box> { self.exporter .runtime - .block_on(async { self.exporter.client.execute(request).await }) + .block_on(async { request.send(&self.exporter.client).await }) } } @@ -163,9 +222,9 @@ impl Exporter { /// Creates a new Exporter, initializing the TLS stack. pub fn new() -> Result> { // Set idle to 0, which prevents the pipe being broken every 2nd request - let client = reqwest::Client::builder() + let client = hyper::Client::builder() .pool_max_idle_per_host(0) - .build()?; + .build(hyper::client::HttpConnector::new()); let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; @@ -176,23 +235,19 @@ impl Exporter { &self, http_method: http::Method, url: &str, - headers: reqwest::header::HeaderMap, + mut headers: hyper::header::HeaderMap, body: &[u8], timeout: std::time::Duration, - ) -> Result> { + ) -> Result, Box> { self.runtime.block_on(async { - let mut builder = self - .client - .request(http_method, url) - .headers(headers) - .body(reqwest::Body::from(Bytes::copy_from_slice(body))); - - if timeout != DURATION_ZERO { - builder = builder.timeout(timeout) - } - - let response = builder.send().await?; - Ok(response) + let mut request = hyper::Request::builder() + .method(http_method) + .uri(url) + .body(hyper::Body::from(Bytes::copy_from_slice(body)))?; + std::mem::swap(request.headers_mut(), &mut headers); + + let request: Request = request.into(); + request.with_timeout(timeout).send(&self.client).await }) } } diff --git a/ddprof-exporter/src/multipart.rs b/ddprof-exporter/src/multipart.rs new file mode 100644 index 0000000..144d895 --- /dev/null +++ b/ddprof-exporter/src/multipart.rs @@ -0,0 +1,741 @@ +use std::borrow::Cow; +use std::fmt; +use std::pin::Pin; +use std::str::FromStr; +use std::task::{Context, Poll}; + +use bytes::Bytes; +use futures::TryStreamExt; +use futures_core::Stream; +use futures_util::{future, stream, StreamExt}; +use http::HeaderMap; +use http_body::Body as HttpBody; +use mime_guess::Mime; +use percent_encoding::{self, AsciiSet, NON_ALPHANUMERIC}; + +type FormResult = std::result::Result>; + +/// An async multipart/form-data request. +pub struct Form { + inner: FormParts, +} + +/// A field in a multipart form. +pub struct Part { + meta: PartMetadata, + value: FormBody, + body_length: Option, +} + +pub(crate) struct FormParts

{ + pub(crate) boundary: String, + pub(crate) computed_headers: Vec>, + pub(crate) fields: Vec<(Cow<'static, str>, P)>, + pub(crate) percent_encoding: PercentEncoding, +} + +pub(crate) struct PartMetadata { + mime: Option, + file_name: Option>, + pub(crate) headers: HeaderMap, +} + +pub(crate) trait PartProps { + fn value_len(&self) -> Option; + fn metadata(&self) -> &PartMetadata; +} + +pin_project_lite::pin_project! { + #[derive(Debug)] + pub (crate) struct FormBody { + #[pin] + inner: hyper::Body + } +} + +impl FormBody { + fn content_length(&self) -> Option { + ::size_hint(&self.inner).exact() + } +} + +impl Stream for FormBody { + type Item = std::result::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.project().inner.poll_data(cx) + } +} + +impl From for FormBody { + fn from(inner: hyper::Body) -> Self { + Self { inner } + } +} + +// ===== impl Form ===== + +impl Default for Form { + fn default() -> Self { + Self::new() + } +} + +impl Form { + /// Creates a new async Form without any content. + pub fn new() -> Form { + Form { + inner: FormParts::new(), + } + } + + /// Get the boundary that this form will use. + #[inline] + pub fn boundary(&self) -> &str { + self.inner.boundary() + } + + /// Add a data field with supplied name and value. + /// + /// # Examples + /// + /// ```ignore + /// let form = multipart::Form::new() + /// .text("username", "seanmonstar") + /// .text("password", "secret"); + /// ``` + pub fn text(self, name: T, value: U) -> Form + where + T: Into>, + U: Into>, + { + self.part(name, Part::text(value)) + } + + /// Adds a customized Part. + pub fn part(self, name: T, part: Part) -> Form + where + T: Into>, + { + self.with_inner(move |inner| inner.part(name, part)) + } + + #[allow(dead_code)] + /// Configure this `Form` to percent-encode using the `path-segment` rules. + pub fn percent_encode_path_segment(self) -> Form { + self.with_inner(|inner| inner.percent_encode_path_segment()) + } + + #[allow(dead_code)] + /// Configure this `Form` to percent-encode using the `attr-char` rules. + pub fn percent_encode_attr_chars(self) -> Form { + self.with_inner(|inner| inner.percent_encode_attr_chars()) + } + + #[allow(dead_code)] + /// Configure this `Form` to skip percent-encoding + pub fn percent_encode_noop(self) -> Form { + self.with_inner(|inner| inner.percent_encode_noop()) + } + + /// Consume this instance and transform into an instance of Body for use in a request. + pub(crate) fn stream(mut self) -> FormBody { + if self.inner.fields.is_empty() { + return FormBody { + inner: hyper::Body::empty(), + }; + } + + // create initial part to init reduce chain + let (name, part) = self.inner.fields.remove(0); + let start = Box::pin(self.part_stream(name, part)) + as Pin> + Send + Sync>>; + + let fields = self.inner.take_fields(); + // for each field, chain an additional stream + let stream = fields.into_iter().fold(start, |memo, (name, part)| { + let part_stream = self.part_stream(name, part); + Box::pin(memo.chain(part_stream)) + as Pin> + Send + Sync>> + }); + // append special ending boundary + let last = stream::once(future::ready(Ok( + format!("--{}--\r\n", self.boundary()).into() + ))); + FormBody { + inner: hyper::Body::wrap_stream(stream.chain(last)), + } + } + + /// Generate a hyper::Body stream for a single Part instance of a Form request. + pub(crate) fn part_stream( + &mut self, + name: T, + part: Part, + ) -> impl Stream> + where + T: Into>, + { + // start with boundary + let boundary = stream::once(future::ready(Ok( + format!("--{}\r\n", self.boundary()).into() + ))); + // append headers + let header = stream::once(future::ready(Ok({ + let mut h = self + .inner + .percent_encoding + .encode_headers(&name.into(), &part.meta); + h.extend_from_slice(b"\r\n\r\n"); + h.into() + }))); + // then append form data followed by terminating CRLF + boundary + .chain(header) + .chain(part.value.map_err(|e| e.into())) + .chain(stream::once(future::ready(Ok("\r\n".into())))) + } + + pub(crate) fn compute_length(&mut self) -> Option { + self.inner.compute_length() + } + + fn with_inner(self, func: F) -> Self + where + F: FnOnce(FormParts) -> FormParts, + { + Form { + inner: func(self.inner), + } + } +} + +impl fmt::Debug for Form { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.inner.fmt_fields("Form", f) + } +} + +// ===== impl Part ===== + +impl Part { + /// Makes a text parameter. + pub fn text(value: T) -> Part + where + T: Into>, + { + let body = match value.into() { + Cow::Borrowed(slice) => hyper::Body::from(slice).into(), + Cow::Owned(string) => hyper::Body::from(string).into(), + }; + Part::new(body, None) + } + + /// Makes a new parameter from arbitrary bytes. + pub fn bytes(value: T) -> Part + where + T: Into>, + { + let body = match value.into() { + Cow::Borrowed(slice) => hyper::Body::from(slice).into(), + Cow::Owned(vec) => hyper::Body::from(vec).into(), + }; + Part::new(body, None) + } + + #[allow(dead_code)] + /// Makes a new parameter from an arbitrary stream. + pub(crate) fn stream>(value: T) -> Part { + Part::new(value.into(), None) + } + + #[allow(dead_code)] + /// Makes a new parameter from an arbitrary stream with a known length. This is particularly + /// useful when adding something like file contents as a stream, where you can know the content + /// length beforehand. + pub(crate) fn stream_with_length>(value: T, length: u64) -> Part { + Part::new(value.into(), Some(length)) + } + + fn new(value: FormBody, body_length: Option) -> Part { + Part { + meta: PartMetadata::new(), + value, + body_length, + } + } + + /// Tries to set the mime of this part. + pub fn mime_str(self, mime: &str) -> FormResult { + Ok(self.mime(Mime::from_str(mime).map_err(Box::new)?)) + } + + // Re-export when mime 0.4 is available, with split MediaType/MediaRange. + fn mime(self, mime: Mime) -> Part { + self.with_inner(move |inner| inner.mime(mime)) + } + + /// Sets the filename, builder style. + pub fn file_name(self, filename: T) -> Part + where + T: Into>, + { + self.with_inner(move |inner| inner.file_name(filename)) + } + + fn with_inner(self, func: F) -> Self + where + F: FnOnce(PartMetadata) -> PartMetadata, + { + Part { + meta: func(self.meta), + ..self + } + } +} + +impl fmt::Debug for Part { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let mut dbg = f.debug_struct("Part"); + dbg.field("value", &self.value); + self.meta.fmt_fields(&mut dbg); + dbg.finish() + } +} + +impl PartProps for Part { + fn value_len(&self) -> Option { + if self.body_length.is_some() { + self.body_length + } else { + self.value.content_length() + } + } + + fn metadata(&self) -> &PartMetadata { + &self.meta + } +} + +// ===== impl FormParts ===== + +impl FormParts

{ + pub(crate) fn new() -> Self { + FormParts { + boundary: gen_boundary(), + computed_headers: Vec::new(), + fields: Vec::new(), + percent_encoding: PercentEncoding::PathSegment, + } + } + + pub(crate) fn boundary(&self) -> &str { + &self.boundary + } + + /// Adds a customized Part. + pub(crate) fn part(mut self, name: T, part: P) -> Self + where + T: Into>, + { + self.fields.push((name.into(), part)); + self + } + + #[allow(dead_code)] + /// Configure this `Form` to percent-encode using the `path-segment` rules. + pub(crate) fn percent_encode_path_segment(mut self) -> Self { + self.percent_encoding = PercentEncoding::PathSegment; + self + } + + #[allow(dead_code)] + /// Configure this `Form` to percent-encode using the `attr-char` rules. + pub(crate) fn percent_encode_attr_chars(mut self) -> Self { + self.percent_encoding = PercentEncoding::AttrChar; + self + } + + #[allow(dead_code)] + /// Configure this `Form` to skip percent-encoding + pub(crate) fn percent_encode_noop(mut self) -> Self { + self.percent_encoding = PercentEncoding::NoOp; + self + } + + // If predictable, computes the length the request will have + // The length should be preditable if only String and file fields have been added, + // but not if a generic reader has been added; + pub(crate) fn compute_length(&mut self) -> Option { + let mut length = 0u64; + for &(ref name, ref field) in self.fields.iter() { + match field.value_len() { + Some(value_length) => { + // We are constructing the header just to get its length. To not have to + // construct it again when the request is sent we cache these headers. + let header = self.percent_encoding.encode_headers(name, field.metadata()); + let header_length = header.len(); + self.computed_headers.push(header); + // The additions mimic the format string out of which the field is constructed + // in Reader. Not the cleanest solution because if that format string is + // ever changed then this formula needs to be changed too which is not an + // obvious dependency in the code. + length += 2 + + self.boundary().len() as u64 + + 2 + + header_length as u64 + + 4 + + value_length + + 2 + } + _ => return None, + } + } + // If there is a at least one field there is a special boundary for the very last field. + if !self.fields.is_empty() { + length += 2 + self.boundary().len() as u64 + 4 + } + Some(length) + } + + /// Take the fields vector of this instance, replacing with an empty vector. + fn take_fields(&mut self) -> Vec<(Cow<'static, str>, P)> { + std::mem::replace(&mut self.fields, Vec::new()) + } +} + +impl FormParts

{ + pub(crate) fn fmt_fields(&self, ty_name: &'static str, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct(ty_name) + .field("boundary", &self.boundary) + .field("parts", &self.fields) + .finish() + } +} + +// ===== impl PartMetadata ===== + +impl PartMetadata { + pub(crate) fn new() -> Self { + PartMetadata { + mime: None, + file_name: None, + headers: HeaderMap::default(), + } + } + + pub(crate) fn mime(mut self, mime: Mime) -> Self { + self.mime = Some(mime); + self + } + + pub(crate) fn file_name(mut self, filename: T) -> Self + where + T: Into>, + { + self.file_name = Some(filename.into()); + self + } +} + +impl PartMetadata { + pub(crate) fn fmt_fields<'f, 'fa, 'fb>( + &self, + debug_struct: &'f mut fmt::DebugStruct<'fa, 'fb>, + ) -> &'f mut fmt::DebugStruct<'fa, 'fb> { + debug_struct + .field("mime", &self.mime) + .field("file_name", &self.file_name) + .field("headers", &self.headers) + } +} + +// https://url.spec.whatwg.org/#fragment-percent-encode-set +const FRAGMENT_ENCODE_SET: &AsciiSet = &percent_encoding::CONTROLS + .add(b' ') + .add(b'"') + .add(b'<') + .add(b'>') + .add(b'`'); + +// https://url.spec.whatwg.org/#path-percent-encode-set +const PATH_ENCODE_SET: &AsciiSet = &FRAGMENT_ENCODE_SET.add(b'#').add(b'?').add(b'{').add(b'}'); + +const PATH_SEGMENT_ENCODE_SET: &AsciiSet = &PATH_ENCODE_SET.add(b'/').add(b'%'); + +#[allow(dead_code)] +// https://tools.ietf.org/html/rfc8187#section-3.2.1 +const ATTR_CHAR_ENCODE_SET: &AsciiSet = &NON_ALPHANUMERIC + .remove(b'!') + .remove(b'#') + .remove(b'$') + .remove(b'&') + .remove(b'+') + .remove(b'-') + .remove(b'.') + .remove(b'^') + .remove(b'_') + .remove(b'`') + .remove(b'|') + .remove(b'~'); + +#[allow(dead_code)] +pub(crate) enum PercentEncoding { + PathSegment, + AttrChar, + NoOp, +} + +impl PercentEncoding { + pub(crate) fn encode_headers(&self, name: &str, field: &PartMetadata) -> Vec { + let s = format!( + "Content-Disposition: form-data; {}{}{}", + self.format_parameter("name", name), + match field.file_name { + Some(ref file_name) => format!("; {}", self.format_filename(file_name)), + None => String::new(), + }, + match field.mime { + Some(ref mime) => format!("\r\nContent-Type: {}", mime), + None => "".to_string(), + }, + ); + field + .headers + .iter() + .fold(s.into_bytes(), |mut header, (k, v)| { + header.extend_from_slice(b"\r\n"); + header.extend_from_slice(k.as_str().as_bytes()); + header.extend_from_slice(b": "); + header.extend_from_slice(v.as_bytes()); + header + }) + } + + // According to RFC7578 Section 4.2, `filename*=` syntax is invalid. + // See https://github.com/seanmonstar/reqwest/issues/419. + fn format_filename(&self, filename: &str) -> String { + let legal_filename = filename + .replace("\\", "\\\\") + .replace("\"", "\\\"") + .replace("\r", "\\\r") + .replace("\n", "\\\n"); + format!("filename=\"{}\"", legal_filename) + } + + fn format_parameter(&self, name: &str, value: &str) -> String { + let legal_value = match *self { + PercentEncoding::PathSegment => { + percent_encoding::utf8_percent_encode(value, PATH_SEGMENT_ENCODE_SET).to_string() + } + PercentEncoding::AttrChar => { + percent_encoding::utf8_percent_encode(value, ATTR_CHAR_ENCODE_SET).to_string() + } + PercentEncoding::NoOp => value.to_string(), + }; + if value.len() == legal_value.len() { + // nothing has been percent encoded + format!("{}=\"{}\"", name, value) + } else { + // something has been percent encoded + format!("{}*=utf-8''{}", name, legal_value) + } + } +} + +fn fast_random() -> u64 { + use std::cell::Cell; + use std::collections::hash_map::RandomState; + use std::hash::{BuildHasher, Hasher}; + use std::num::Wrapping; + + thread_local! { + static RNG: Cell> = Cell::new(Wrapping(seed())); + } + + fn seed() -> u64 { + let seed = RandomState::new(); + + let mut out = 0; + let mut cnt = 0; + while out == 0 { + cnt += 1; + let mut hasher = seed.build_hasher(); + hasher.write_usize(cnt); + out = hasher.finish(); + } + out + } + + RNG.with(|rng| { + let mut n = rng.get(); + debug_assert_ne!(n.0, 0); + n ^= n >> 12; + n ^= n << 25; + n ^= n >> 27; + rng.set(n); + n.0.wrapping_mul(0x2545_f491_4f6c_dd1d) + }) +} + +fn gen_boundary() -> String { + let a = fast_random(); + let b = fast_random(); + let c = fast_random(); + let d = fast_random(); + + format!("{:016x}-{:016x}-{:016x}-{:016x}", a, b, c, d) +} + +#[cfg(test)] +mod tests { + use super::*; + use futures_util::TryStreamExt; + use futures_util::{future, stream}; + use tokio::{self, runtime}; + + #[test] + fn form_empty() { + let form = Form::new(); + + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("new rt"); + let body = form.stream().into_stream(); + let s = body.map_ok(|try_c| try_c.to_vec()).try_concat(); + + let out = rt.block_on(s); + assert!(out.unwrap().is_empty()); + } + + #[test] + fn stream_to_end() { + let mut form = Form::new() + .part( + "reader1", + Part::stream(hyper::Body::wrap_stream(stream::once(future::ready::< + FormResult, + >( + Ok("part1".to_owned()), + )))) + .into(), + ) + .part("key1", Part::text("value1")) + .part( + "key2", + Part::text("value2").mime(mime_guess::mime::IMAGE_BMP), + ) + .part( + "reader2", + Part::stream(hyper::Body::wrap_stream(stream::once(future::ready::< + FormResult, + >( + Ok("part2".to_owned()), + )))), + ) + .part("key3", Part::text("value3").file_name("filename")); + form.inner.boundary = "boundary".to_string(); + let expected = "--boundary\r\n\ + Content-Disposition: form-data; name=\"reader1\"\r\n\r\n\ + part1\r\n\ + --boundary\r\n\ + Content-Disposition: form-data; name=\"key1\"\r\n\r\n\ + value1\r\n\ + --boundary\r\n\ + Content-Disposition: form-data; name=\"key2\"\r\n\ + Content-Type: image/bmp\r\n\r\n\ + value2\r\n\ + --boundary\r\n\ + Content-Disposition: form-data; name=\"reader2\"\r\n\r\n\ + part2\r\n\ + --boundary\r\n\ + Content-Disposition: form-data; name=\"key3\"; filename=\"filename\"\r\n\r\n\ + value3\r\n--boundary--\r\n"; + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("new rt"); + let body = form.stream().into_stream(); + let s = body.map(|try_c| try_c.map(|r| r.to_vec())).try_concat(); + + let out = rt.block_on(s).unwrap(); + // These prints are for debug purposes in case the test fails + println!( + "START REAL\n{}\nEND REAL", + std::str::from_utf8(&out).unwrap() + ); + println!("START EXPECTED\n{}\nEND EXPECTED", expected); + assert_eq!(std::str::from_utf8(&out).unwrap(), expected); + } + + #[test] + fn stream_to_end_with_header() { + let mut part = Part::text("value2").mime(mime_guess::mime::IMAGE_BMP); + part.meta.headers.insert("Hdr3", "/a/b/c".parse().unwrap()); + let mut form = Form::new().part("key2", part); + form.inner.boundary = "boundary".to_string(); + let expected = "--boundary\r\n\ + Content-Disposition: form-data; name=\"key2\"\r\n\ + Content-Type: image/bmp\r\n\ + hdr3: /a/b/c\r\n\ + \r\n\ + value2\r\n\ + --boundary--\r\n"; + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("new rt"); + let body = form.stream().into_stream(); + let s = body.map(|try_c| try_c.map(|r| r.to_vec())).try_concat(); + + let out = rt.block_on(s).unwrap(); + // These prints are for debug purposes in case the test fails + println!( + "START REAL\n{}\nEND REAL", + std::str::from_utf8(&out).unwrap() + ); + println!("START EXPECTED\n{}\nEND EXPECTED", expected); + assert_eq!(std::str::from_utf8(&out).unwrap(), expected); + } + + #[test] + fn correct_content_length() { + // Setup an arbitrary data stream + let stream_data = b"just some stream data"; + let stream_len = stream_data.len(); + let stream_data = stream_data + .chunks(3) + .map(|c| Ok::<_, std::io::Error>(Bytes::from(c))); + let the_stream = futures_util::stream::iter(stream_data); + + let bytes_data = b"some bytes data".to_vec(); + let bytes_len = bytes_data.len(); + + let stream_part = + Part::stream_with_length(hyper::Body::wrap_stream(the_stream), stream_len as u64); + let body_part = Part::bytes(bytes_data); + + // A simple check to make sure we get the configured body length + assert_eq!(stream_part.value_len().unwrap(), stream_len as u64); + + // Make sure it delegates to the underlying body if length is not specified + assert_eq!(body_part.value_len().unwrap(), bytes_len as u64); + } + + #[test] + fn header_percent_encoding() { + let name = "start%'\"\r\nßend"; + let field = Part::text(""); + + assert_eq!( + PercentEncoding::PathSegment.encode_headers(name, &field.meta), + &b"Content-Disposition: form-data; name*=utf-8''start%25'%22%0D%0A%C3%9Fend"[..] + ); + + assert_eq!( + PercentEncoding::AttrChar.encode_headers(name, &field.meta), + &b"Content-Disposition: form-data; name*=utf-8''start%25%27%22%0D%0A%C3%9Fend"[..] + ); + } +} diff --git a/ddprof-exporter/tests/form.rs b/ddprof-exporter/tests/form.rs index 35a15aa..73a31e6 100644 --- a/ddprof-exporter/tests/form.rs +++ b/ddprof-exporter/tests/form.rs @@ -1,8 +1,7 @@ // Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0. // This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2021-Present Datadog, Inc. -use ddprof_exporter::{Endpoint, File, ProfileExporterV3, Tag}; -use reqwest::Url; +use ddprof_exporter::{Endpoint, File, ProfileExporterV3, Tag, Request}; use std::borrow::Cow; use std::error::Error; use std::io::Read; @@ -18,7 +17,7 @@ fn open>(path: P) -> Result, Box> { Ok(buffer) } -fn multipart(exporter: &ProfileExporterV3) -> reqwest::Request { +fn multipart(exporter: &ProfileExporterV3) -> Request { let small_pprof_name = concat!(env!("CARGO_MANIFEST_DIR"), "/tests/profile.pprof"); let buffer = open(small_pprof_name).expect("to open file and read its bytes"); @@ -37,10 +36,8 @@ fn multipart(exporter: &ProfileExporterV3) -> reqwest::Request { .build(start, end, files, timeout) .expect("request to be built"); - let actual_timeout = *request.timeout().expect("timeout to exist"); + let actual_timeout = request.timeout().expect("timeout to exist"); assert_eq!(actual_timeout, timeout); - - assert!(request.body().is_some()); request } @@ -59,7 +56,7 @@ fn default_tags() -> Vec { #[test] fn multipart_agent() { - let base_url = Url::parse("http://localhost:8126").expect("url to parse"); + let base_url = "http://localhost:8126".parse().expect("url to parse"); let endpoint = Endpoint::agent(base_url).expect("endpoint to construct"); let exporter = ProfileExporterV3::new("php", default_tags(), endpoint).expect("exporter to construct"); @@ -67,7 +64,7 @@ fn multipart_agent() { let request = multipart(&exporter); assert_eq!( - request.url().as_str(), + request.uri().to_string(), "http://localhost:8126/profiling/v1/input" ); @@ -85,7 +82,7 @@ fn multipart_agentless() { let request = multipart(&exporter); assert_eq!( - request.url().as_str(), + request.uri().to_string(), "https://intake.profile.datadoghq.com/v1/input" ); diff --git a/ddprof-ffi/Cargo.toml b/ddprof-ffi/Cargo.toml index c91470e..b04d429 100644 --- a/ddprof-ffi/Cargo.toml +++ b/ddprof-ffi/Cargo.toml @@ -17,4 +17,4 @@ chrono = "0.4" ddprof-exporter = { path = "../ddprof-exporter", version = "0.4.0-rc.1" } ddprof-profiles = { path = "../ddprof-profiles", version = "0.4.0-rc.1" } libc = "0.2" -reqwest = { version = "0.11", features = ["blocking", "multipart", "rustls-tls"], default-features = false } +hyper = { version = "0.14", default-features = false } diff --git a/ddprof-ffi/src/exporter.rs b/ddprof-ffi/src/exporter.rs index 459a59a..27fda3a 100644 --- a/ddprof-ffi/src/exporter.rs +++ b/ddprof-ffi/src/exporter.rs @@ -4,7 +4,7 @@ use crate::{Buffer, Slice, Timespec}; use ddprof_exporter as exporter; use exporter::{Exporter, ProfileExporterV3}; -use reqwest::header::HeaderMap; +use hyper::header::HeaderMap; use std::borrow::Cow; use std::convert::TryInto; use std::ffi::CStr; @@ -86,17 +86,17 @@ pub unsafe extern "C" fn exporter_send( Some(non_null_exporter) => { let exporter = non_null_exporter.as_ref(); - match || -> Result> { + match || -> Result, Box> { let mut headers_map = HeaderMap::with_capacity(headers.len); for field in headers.into_slice().iter() { let name = CStr::from_ptr((*field).name); let value = (*field).value.try_into()?; - let header = reqwest::header::HeaderValue::from_str(value)?; + let header = hyper::header::HeaderValue::from_str(value)?; headers_map.insert(name.to_str()?, header); } - let method = reqwest::Method::from_bytes(CStr::from_ptr(http_method).to_bytes())?; + let method = hyper::Method::from_bytes(CStr::from_ptr(http_method).to_bytes())?; let url_str = CStr::from_ptr(url).to_str()?; let body_slice: &[u8] = body.into(); let timeout = Duration::from_millis(timeout_ms); @@ -160,7 +160,7 @@ pub struct File<'a> { /// This type only exists to workaround a bug in cbindgen; may be removed in the /// future. -pub struct Request(reqwest::Request); +pub struct Request(exporter::Request); #[repr(C)] /// cbindgen:field-names=[code] @@ -229,9 +229,10 @@ fn try_to_tags(tags: Slice) -> Result, Box Result> { +fn try_to_url(slice: ByteSlice) -> Result> { + use std::str::FromStr; let str = slice.try_into()?; - match reqwest::Url::parse(str) { + match hyper::Uri::from_str(str) { Ok(url) => Ok(url), Err(err) => Err(Box::new(err)), } From b0ad5ffc949397a46331758b1c91eca80b0cb51e Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Fri, 25 Feb 2022 15:58:44 +0100 Subject: [PATCH 02/14] Create a Connector that can handle uniux socket streams --- ddprof-exporter/src/connector.rs | 129 +++++++++++++++++++++++++++++++ ddprof-exporter/src/lib.rs | 5 +- 2 files changed, 132 insertions(+), 2 deletions(-) create mode 100644 ddprof-exporter/src/connector.rs diff --git a/ddprof-exporter/src/connector.rs b/ddprof-exporter/src/connector.rs new file mode 100644 index 0000000..548ed44 --- /dev/null +++ b/ddprof-exporter/src/connector.rs @@ -0,0 +1,129 @@ +use std::error::Error; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use pin_project_lite::pin_project; + +#[derive(Clone)] +struct UnixConnector(); + +impl hyper::service::Service for UnixConnector { + type Response = tokio::net::UnixStream; + type Error = Box; + type Future = Pin> + Send>>; + + fn call(&mut self, uri: hyper::Uri) -> Self::Future { + Box::pin(async move { Ok(tokio::net::UnixStream::connect(uri.path()).await?) }) + } + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} + +#[derive(Clone)] +pub struct Connector { + tcp: hyper::client::HttpConnector, +} + +impl Connector { + pub(crate) fn new() -> Self { + Self { + tcp: hyper::client::HttpConnector::new(), + } + } +} + +pin_project! { + #[project = ConnStreamProj] + pub enum ConnStream { + Tcp{ #[pin] transport: tokio::net::TcpStream }, + Udp{ #[pin] transport: tokio::net::UnixStream }, + } +} + +impl tokio::io::AsyncRead for ConnStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + match self.project() { + ConnStreamProj::Tcp { transport } => transport.poll_read(cx, buf), + ConnStreamProj::Udp { transport } => transport.poll_read(cx, buf), + } + } +} + +impl hyper::client::connect::Connection for ConnStream { + fn connected(&self) -> hyper::client::connect::Connected { + match self { + Self::Tcp { transport } => transport.connected(), + Self::Udp { transport: _ } => hyper::client::connect::Connected::new(), + } + } +} + +impl tokio::io::AsyncWrite for ConnStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + match self.project() { + ConnStreamProj::Tcp { transport } => transport.poll_write(cx, buf), + ConnStreamProj::Udp { transport } => transport.poll_write(cx, buf), + } + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match self.project() { + ConnStreamProj::Tcp { transport } => transport.poll_shutdown(cx), + ConnStreamProj::Udp { transport } => transport.poll_shutdown(cx), + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.project() { + ConnStreamProj::Tcp { transport } => transport.poll_flush(cx), + ConnStreamProj::Udp { transport } => transport.poll_flush(cx), + } + } +} + +impl hyper::service::Service for Connector { + type Response = ConnStream; + type Error = Box; + type Future = Pin> + Send>>; + + fn call(&mut self, uri: hyper::Uri) -> Self::Future { + match uri.scheme_str() { + Some("unix") => Box::pin(async move { + Ok(ConnStream::Udp { + transport: tokio::net::UnixStream::connect(uri.path()).await?, + }) + }), + _ => { + let fut = self.tcp.call(uri); + Box::pin(async { + Ok(ConnStream::Tcp { + transport: fut.await?, + }) + }) + } + } + } + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.tcp.poll_ready(cx).map_err(|e| e.into()) + } +} + +#[test] +fn test_hyper_client_from_connector() { + let _: hyper::Client = hyper::Client::builder().build(Connector::new()); +} diff --git a/ddprof-exporter/src/lib.rs b/ddprof-exporter/src/lib.rs index e62b181..670b2b7 100644 --- a/ddprof-exporter/src/lib.rs +++ b/ddprof-exporter/src/lib.rs @@ -12,11 +12,12 @@ use tokio::runtime::Runtime; mod container_id; mod multipart; +mod connector; const DURATION_ZERO: std::time::Duration = std::time::Duration::from_millis(0); const DATADOG_CONTAINER_ID_HEADER: &str = "Datadog-Container-ID"; -type Client = hyper::Client; +type Client = hyper::Client; pub struct Exporter { client: Client, @@ -224,7 +225,7 @@ impl Exporter { // Set idle to 0, which prevents the pipe being broken every 2nd request let client = hyper::Client::builder() .pool_max_idle_per_host(0) - .build(hyper::client::HttpConnector::new()); + .build(connector::Connector::new()); let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; From 752f47afd20eab3e9f03046fc30e267beedf3a1a Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Fri, 25 Feb 2022 17:27:24 +0100 Subject: [PATCH 03/14] Add rustls hyper support --- Cargo.lock | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 87873e8..a85c3d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -56,16 +56,15 @@ dependencies = [ [[package]] name = "ddprof" -version = "0.3.0" +version = "0.4.0-rc.1" dependencies = [ "ddprof-exporter", - "ddprof-ffi", "ddprof-profiles", ] [[package]] name = "ddprof-exporter" -version = "0.3.0" +version = "0.4.0-rc.1" dependencies = [ "bytes", "chrono", @@ -87,7 +86,7 @@ dependencies = [ [[package]] name = "ddprof-ffi" -version = "0.3.0" +version = "0.4.0-rc.1" dependencies = [ "chrono", "ddprof-exporter", @@ -98,7 +97,7 @@ dependencies = [ [[package]] name = "ddprof-profiles" -version = "0.3.0" +version = "0.4.0-rc.1" dependencies = [ "indexmap", "libc", From c93e952d5d383a79355cedb7dd69f1ec4574cc9e Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Fri, 25 Feb 2022 17:27:24 +0100 Subject: [PATCH 04/14] Add rustls hyper support --- Cargo.lock | 257 +++++++++++++++++++++++++++++++ ddprof-exporter/Cargo.toml | 1 + ddprof-exporter/src/connector.rs | 10 +- 3 files changed, 265 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a85c3d3..2111f68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,18 +23,36 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "base64" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" + [[package]] name = "bitflags" version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bumpalo" +version = "3.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a45a46ab1f2412e53d3a0ade76ffad2025804294569aae387231a0cd6e0899" + [[package]] name = "bytes" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +[[package]] +name = "cc" +version = "1.0.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" + [[package]] name = "cfg-if" version = "1.0.0" @@ -54,6 +72,22 @@ dependencies = [ "winapi", ] +[[package]] +name = "core-foundation" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" + [[package]] name = "ddprof" version = "0.4.0-rc.1" @@ -74,6 +108,7 @@ dependencies = [ "http", "http-body", "hyper", + "hyper-rustls", "lazy_static", "libc", "maplit", @@ -301,6 +336,20 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac" +dependencies = [ + "http", + "hyper", + "rustls", + "rustls-native-certs", + "tokio", + "tokio-rustls", +] + [[package]] name = "indexmap" version = "1.7.0" @@ -326,6 +375,15 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" +[[package]] +name = "js-sys" +version = "0.3.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a38fc24e30fd564ce974c02bf1d337caddff65be6cc4735a1f7eab22a7440f04" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -431,6 +489,18 @@ dependencies = [ "autocfg", ] +[[package]] +name = "once_cell" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5" + +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + [[package]] name = "percent-encoding" version = "2.1.0" @@ -621,6 +691,96 @@ dependencies = [ "winapi", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi", +] + +[[package]] +name = "rustls" +version = "0.20.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fbfeb8d0ddb84706bc597a5574ab8912817c52a397f819e5b614e2265206921" +dependencies = [ + "ring", + "sct", + "webpki", +] + +[[package]] +name = "rustls-native-certs" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ca9ebdfa27d3fc180e42879037b5338ab1c040c06affd00d8338598e7800943" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9" +dependencies = [ + "base64", +] + +[[package]] +name = "schannel" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" +dependencies = [ + "lazy_static", + "winapi", +] + +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "security-framework" +version = "2.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "525bc1abfda2e1998d152c45cf13e696f76d0a4972310b22fac1658b05df7c87" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9dd14d83160b528b7bfd66439110573efcfbe281b17fc2ca9f39f550d619c7e" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "slab" version = "0.4.5" @@ -637,6 +797,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "syn" version = "1.0.81" @@ -685,6 +851,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "tokio-rustls" +version = "0.23.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a27d5f2b839802bd8267fa19b0530f5a08b9c08cd417976be2a65d130fe1c11b" +dependencies = [ + "rustls", + "tokio", + "webpki", +] + [[package]] name = "tower-service" version = "0.3.1" @@ -738,6 +915,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "ux" version = "0.1.3" @@ -766,6 +949,80 @@ version = "0.10.2+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +[[package]] +name = "wasm-bindgen" +version = "0.2.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25f1af7423d8588a3d840681122e72e6a24ddbcb3f0ec385cac0d12d24256c06" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b21c0df030f5a177f3cba22e9bc4322695ec43e7257d865302900290bcdedca" +dependencies = [ + "bumpalo", + "lazy_static", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f4203d69e40a52ee523b2529a773d5ffc1dc0071801c87b3d270b471b80ed01" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa8a30d46208db204854cadbb5d4baf5fcf8071ba5bf48190c3e59937962ebc" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d958d035c4438e28c70e4321a2911302f10135ce78a9c7834c0cab4123d06a2" + +[[package]] +name = "web-sys" +version = "0.3.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c060b319f29dd25724f09a2ba1418f142f539b2be99fbf4d2d5a8f7330afb8eb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "which" version = "4.2.2" diff --git a/ddprof-exporter/Cargo.toml b/ddprof-exporter/Cargo.toml index bb13033..16885f3 100644 --- a/ddprof-exporter/Cargo.toml +++ b/ddprof-exporter/Cargo.toml @@ -26,6 +26,7 @@ futures-util = { version = "0.3.0", default-features = false } mime_guess = { version = "2.0", default-features = false } http-body = "0.4" pin-project-lite = "0.2.0" +hyper-rustls = { version = "0.23", default-features = false, features = ["native-tokio", "http1"] } [dev-dependencies] maplit = "1.0" diff --git a/ddprof-exporter/src/connector.rs b/ddprof-exporter/src/connector.rs index 548ed44..b7d0994 100644 --- a/ddprof-exporter/src/connector.rs +++ b/ddprof-exporter/src/connector.rs @@ -24,13 +24,17 @@ impl hyper::service::Service for UnixConnector { #[derive(Clone)] pub struct Connector { - tcp: hyper::client::HttpConnector, + tcp: hyper_rustls::HttpsConnector, } impl Connector { pub(crate) fn new() -> Self { Self { - tcp: hyper::client::HttpConnector::new(), + tcp: hyper_rustls::HttpsConnectorBuilder::new() + .with_native_roots() + .https_or_http() + .enable_http1() + .build(), } } } @@ -38,7 +42,7 @@ impl Connector { pin_project! { #[project = ConnStreamProj] pub enum ConnStream { - Tcp{ #[pin] transport: tokio::net::TcpStream }, + Tcp{ #[pin] transport: hyper_rustls::MaybeHttpsStream }, Udp{ #[pin] transport: tokio::net::UnixStream }, } } From c76a7c373773f320a48ee167951f7f5302c70c02 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Mon, 28 Feb 2022 11:35:58 +0100 Subject: [PATCH 05/14] Rename CLient to HttpClient to work around cbindgen --- ddprof-exporter/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ddprof-exporter/src/lib.rs b/ddprof-exporter/src/lib.rs index 670b2b7..78dece5 100644 --- a/ddprof-exporter/src/lib.rs +++ b/ddprof-exporter/src/lib.rs @@ -17,10 +17,10 @@ mod connector; const DURATION_ZERO: std::time::Duration = std::time::Duration::from_millis(0); const DATADOG_CONTAINER_ID_HEADER: &str = "Datadog-Container-ID"; -type Client = hyper::Client; +type HttpClient = hyper::Client; pub struct Exporter { - client: Client, + client: HttpClient, runtime: Runtime, } @@ -90,7 +90,7 @@ impl Request { async fn send( self, - client: &Client, + client: &HttpClient, ) -> Result, Box> { Ok(match self.timeout { Some(t) => tokio::time::timeout(t, client.request(self.req)).await?, From 5df3c97df58515ed852778b7b926113cdd958a6d Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Wed, 2 Mar 2022 13:45:32 +0100 Subject: [PATCH 06/14] handle unix socket path correctly --- Cargo.lock | 7 ++++++ ddprof-exporter/Cargo.toml | 1 + ddprof-exporter/src/connector.rs | 38 +++++++++++++++++++++++++++++--- ddprof-exporter/src/errors.rs | 19 ++++++++++++++++ ddprof-exporter/src/lib.rs | 5 ++++- ddprof-ffi/src/exporter.rs | 7 ++++-- ffi-build.sh | 2 +- 7 files changed, 72 insertions(+), 7 deletions(-) create mode 100644 ddprof-exporter/src/errors.rs diff --git a/Cargo.lock b/Cargo.lock index 2111f68..5b14b3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -105,6 +105,7 @@ dependencies = [ "futures", "futures-core", "futures-util", + "hex", "http", "http-body", "hyper", @@ -279,6 +280,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "http" version = "0.2.5" diff --git a/ddprof-exporter/Cargo.toml b/ddprof-exporter/Cargo.toml index 16885f3..fa65f17 100644 --- a/ddprof-exporter/Cargo.toml +++ b/ddprof-exporter/Cargo.toml @@ -27,6 +27,7 @@ mime_guess = { version = "2.0", default-features = false } http-body = "0.4" pin-project-lite = "0.2.0" hyper-rustls = { version = "0.23", default-features = false, features = ["native-tokio", "http1"] } +hex = "0.4" [dev-dependencies] maplit = "1.0" diff --git a/ddprof-exporter/src/connector.rs b/ddprof-exporter/src/connector.rs index b7d0994..26a6ff9 100644 --- a/ddprof-exporter/src/connector.rs +++ b/ddprof-exporter/src/connector.rs @@ -1,10 +1,41 @@ use std::error::Error; use std::future::Future; +use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll}; use pin_project_lite::pin_project; +/// Creates a new Uri, with the `unix` scheme, and the path to the socket +/// encoded as a hex string, to prevent special characters in the url authority +pub fn socket_path_to_uri(path: &str) -> Result> { + let path = hex::encode(path); + Ok(hyper::Uri::builder() + .scheme("unix") + .authority(path) + .path_and_query("") + .build()?) +} + +/// Decodes +pub fn socket_path_from_uri( + uri: &hyper::Uri, +) -> Result> { + if uri.scheme_str() != Some("unix") { + return Err(crate::errors::Error::InvalidUrl.into()); + } + let path = String::from_utf8( + hex::decode( + uri.authority() + .ok_or(crate::errors::Error::InvalidUrl)? + .as_str(), + ) + .map_err(|_| crate::errors::Error::InvalidUrl)?, + ) + .map_err(|_| crate::errors::Error::InvalidUrl)?; + Ok(PathBuf::from(path)) +} + #[derive(Clone)] struct UnixConnector(); @@ -23,7 +54,7 @@ impl hyper::service::Service for UnixConnector { } #[derive(Clone)] -pub struct Connector { +pub(crate) struct Connector { tcp: hyper_rustls::HttpsConnector, } @@ -41,7 +72,7 @@ impl Connector { pin_project! { #[project = ConnStreamProj] - pub enum ConnStream { + pub(crate) enum ConnStream { Tcp{ #[pin] transport: hyper_rustls::MaybeHttpsStream }, Udp{ #[pin] transport: tokio::net::UnixStream }, } @@ -107,8 +138,9 @@ impl hyper::service::Service for Connector { fn call(&mut self, uri: hyper::Uri) -> Self::Future { match uri.scheme_str() { Some("unix") => Box::pin(async move { + let path = socket_path_from_uri(&uri)?; Ok(ConnStream::Udp { - transport: tokio::net::UnixStream::connect(uri.path()).await?, + transport: tokio::net::UnixStream::connect(path).await?, }) }), _ => { diff --git a/ddprof-exporter/src/errors.rs b/ddprof-exporter/src/errors.rs new file mode 100644 index 0000000..d409986 --- /dev/null +++ b/ddprof-exporter/src/errors.rs @@ -0,0 +1,19 @@ +use std::error; +use std::fmt; + +#[derive(Clone, Debug)] +pub(crate) enum Error { + InvalidUrl, + OperationTimedOut, +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(match self { + Self::InvalidUrl => "invalid url", + Self::OperationTimedOut => "operation timed out", + }) + } +} + +impl error::Error for Error {} diff --git a/ddprof-exporter/src/lib.rs b/ddprof-exporter/src/lib.rs index 78dece5..e11bfd5 100644 --- a/ddprof-exporter/src/lib.rs +++ b/ddprof-exporter/src/lib.rs @@ -10,9 +10,12 @@ use hyper::header::HeaderValue; use hyper::Uri; use tokio::runtime::Runtime; +mod connector; mod container_id; +mod errors; mod multipart; -mod connector; + +pub use connector::socket_path_to_uri; const DURATION_ZERO: std::time::Duration = std::time::Duration::from_millis(0); const DATADOG_CONTAINER_ID_HEADER: &str = "Datadog-Container-ID"; diff --git a/ddprof-ffi/src/exporter.rs b/ddprof-ffi/src/exporter.rs index 27fda3a..148c41b 100644 --- a/ddprof-ffi/src/exporter.rs +++ b/ddprof-ffi/src/exporter.rs @@ -11,6 +11,7 @@ use std::ffi::CStr; use std::io::Write; use std::os::raw::c_char; use std::ptr::NonNull; +use std::str::FromStr; use std::time::Duration; #[repr(C)] @@ -230,8 +231,10 @@ fn try_to_tags(tags: Slice) -> Result, Box Result> { - use std::str::FromStr; - let str = slice.try_into()?; + let str: &str = slice.try_into()?; + if let Some(path) = str.strip_prefix("unix://") { + return Ok(ddprof_exporter::socket_path_to_uri(path)?); + } match hyper::Uri::from_str(str) { Ok(url) => Ok(url), Err(err) => Err(Box::new(err)), diff --git a/ffi-build.sh b/ffi-build.sh index 60444e4..ddd2333 100755 --- a/ffi-build.sh +++ b/ffi-build.sh @@ -33,7 +33,7 @@ case "$target" in remove_rpath=1 ;; "x86_64-apple-darwin") - expected_native_static_libs=" -framework Security -liconv -lSystem -lresolv -lc -lm -liconv" + expected_native_static_libs=" -framework Security -framework CoreFoundation -liconv -lSystem -lresolv -lc -lm -liconv" native_static_libs="${expected_native_static_libs}" shared_library_suffix=".dylib" # fix usage of library in macos via rpath From 183bc5dceded20b8803f3715b482cc2525ff0316 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Wed, 2 Mar 2022 13:45:55 +0100 Subject: [PATCH 07/14] Better error message when the request times out --- ddprof-exporter/src/lib.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ddprof-exporter/src/lib.rs b/ddprof-exporter/src/lib.rs index e11bfd5..cee129d 100644 --- a/ddprof-exporter/src/lib.rs +++ b/ddprof-exporter/src/lib.rs @@ -96,7 +96,9 @@ impl Request { client: &HttpClient, ) -> Result, Box> { Ok(match self.timeout { - Some(t) => tokio::time::timeout(t, client.request(self.req)).await?, + Some(t) => tokio::time::timeout(t, client.request(self.req)) + .await + .map_err(|_| crate::errors::Error::OperationTimedOut)?, None => client.request(self.req).await, }?) } From a23806221b4a9ca0603c12943ae9f87c9ea7a0ed Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Wed, 2 Mar 2022 13:46:36 +0100 Subject: [PATCH 08/14] Separate original code from adapter in multipart handling --- ddprof-exporter/src/multipart.rs | 60 +++++++++++++++++--------------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/ddprof-exporter/src/multipart.rs b/ddprof-exporter/src/multipart.rs index 144d895..4690a3c 100644 --- a/ddprof-exporter/src/multipart.rs +++ b/ddprof-exporter/src/multipart.rs @@ -13,8 +13,40 @@ use http_body::Body as HttpBody; use mime_guess::Mime; use percent_encoding::{self, AsciiSet, NON_ALPHANUMERIC}; +// ==== Type adapters to remove the dependency on reqwest ==== + type FormResult = std::result::Result>; +pin_project_lite::pin_project! { + #[derive(Debug)] + pub (crate) struct FormBody { + #[pin] + inner: hyper::Body + } +} + +impl FormBody { + fn content_length(&self) -> Option { + ::size_hint(&self.inner).exact() + } +} + +impl Stream for FormBody { + type Item = std::result::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.project().inner.poll_data(cx) + } +} + +impl From for FormBody { + fn from(inner: hyper::Body) -> Self { + Self { inner } + } +} + +// ==== Original code from https://github.com/seanmonstar/reqwest/blob/master/src/async_impl/multipart.rs ==== + /// An async multipart/form-data request. pub struct Form { inner: FormParts, @@ -45,34 +77,6 @@ pub(crate) trait PartProps { fn metadata(&self) -> &PartMetadata; } -pin_project_lite::pin_project! { - #[derive(Debug)] - pub (crate) struct FormBody { - #[pin] - inner: hyper::Body - } -} - -impl FormBody { - fn content_length(&self) -> Option { - ::size_hint(&self.inner).exact() - } -} - -impl Stream for FormBody { - type Item = std::result::Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.project().inner.poll_data(cx) - } -} - -impl From for FormBody { - fn from(inner: hyper::Body) -> Self { - Self { inner } - } -} - // ===== impl Form ===== impl Default for Form { From 8d517aa1546dc1453b4be411a742d1a206879212 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Wed, 2 Mar 2022 13:56:52 +0100 Subject: [PATCH 09/14] Add tests for uds handling --- ddprof-exporter/src/connector.rs | 41 +++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/ddprof-exporter/src/connector.rs b/ddprof-exporter/src/connector.rs index 26a6ff9..c6e485a 100644 --- a/ddprof-exporter/src/connector.rs +++ b/ddprof-exporter/src/connector.rs @@ -159,7 +159,42 @@ impl hyper::service::Service for Connector { } } -#[test] -fn test_hyper_client_from_connector() { - let _: hyper::Client = hyper::Client::builder().build(Connector::new()); +#[cfg(test)] +mod tests { + use std::path::Path; + + use super::*; + + #[test] + /// Verify that the Connector type implements the correct bound Connect + Clone + /// to be able to use the hyper::Client + fn test_hyper_client_from_connector() { + let _: hyper::Client = hyper::Client::builder().build(Connector::new()); + } + + #[test] + fn test_encode_unix_socket_path_absolute() { + let expected_path = "/path/to/a/socket.sock"; + let uri = socket_path_to_uri(expected_path).unwrap(); + assert_eq!(uri.scheme_str(), Some("unix")); + + let actual_path = socket_path_from_uri(&uri).unwrap(); + assert_eq!(actual_path.as_path(), Path::new(expected_path)) + } + + #[test] + fn test_encode_unix_socket_relative_path() { + let expected_path = "relative/path/to/a/socket.sock"; + let uri = socket_path_to_uri(expected_path).unwrap(); + let actual_path = socket_path_from_uri(&uri).unwrap(); + assert_eq!(actual_path.as_path(), Path::new(expected_path)); + + let expected_path = "./relative/path/to/a/socket.sock"; + let uri = socket_path_to_uri(expected_path).unwrap(); + let actual_path = socket_path_from_uri(&uri).unwrap(); + assert_eq!(actual_path.as_path(), Path::new(expected_path)); + } } + + + From 7667e3ddf4709f728958aefa313aa6c7ab832532 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Wed, 2 Mar 2022 15:50:55 +0100 Subject: [PATCH 10/14] skip uds feature on non unix systems because tokio doesn't support it --- ddprof-exporter/src/connector.rs | 158 ++++++++++++++++--------------- ddprof-exporter/src/errors.rs | 3 + ddprof-exporter/src/lib.rs | 13 ++- ddprof-ffi/src/exporter.rs | 3 +- 4 files changed, 98 insertions(+), 79 deletions(-) diff --git a/ddprof-exporter/src/connector.rs b/ddprof-exporter/src/connector.rs index c6e485a..3ef58ba 100644 --- a/ddprof-exporter/src/connector.rs +++ b/ddprof-exporter/src/connector.rs @@ -1,55 +1,83 @@ use std::error::Error; use std::future::Future; -use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll}; -use pin_project_lite::pin_project; - -/// Creates a new Uri, with the `unix` scheme, and the path to the socket -/// encoded as a hex string, to prevent special characters in the url authority -pub fn socket_path_to_uri(path: &str) -> Result> { - let path = hex::encode(path); - Ok(hyper::Uri::builder() - .scheme("unix") - .authority(path) - .path_and_query("") - .build()?) -} - -/// Decodes -pub fn socket_path_from_uri( - uri: &hyper::Uri, -) -> Result> { - if uri.scheme_str() != Some("unix") { - return Err(crate::errors::Error::InvalidUrl.into()); +// Tokio doesn't handle unix sockets on windows +#[cfg(unix)] +pub(crate) mod uds { + use pin_project_lite::pin_project; + use std::error::Error; + use std::ffi::OsString; + use std::os::unix::ffi::{OsStrExt, OsStringExt}; + use std::path::{Path, PathBuf}; + + /// Creates a new Uri, with the `unix` scheme, and the path to the socket + /// encoded as a hex string, to prevent special characters in the url authority + pub fn socket_path_to_uri(path: &Path) -> Result> { + let path = hex::encode(path.as_os_str().as_bytes()); + Ok(hyper::Uri::builder() + .scheme("unix") + .authority(path) + .path_and_query("") + .build()?) } - let path = String::from_utf8( - hex::decode( + + pub fn socket_path_from_uri( + uri: &hyper::Uri, + ) -> Result> { + if uri.scheme_str() != Some("unix") { + return Err(crate::errors::Error::InvalidUrl.into()); + } + let path = hex::decode( uri.authority() .ok_or(crate::errors::Error::InvalidUrl)? .as_str(), ) - .map_err(|_| crate::errors::Error::InvalidUrl)?, - ) - .map_err(|_| crate::errors::Error::InvalidUrl)?; - Ok(PathBuf::from(path)) -} + .map_err(|_| crate::errors::Error::InvalidUrl)?; + Ok(PathBuf::from(OsString::from_vec(path))) + } -#[derive(Clone)] -struct UnixConnector(); + #[test] + fn test_encode_unix_socket_path_absolute() { + let expected_path = "/path/to/a/socket.sock".as_ref(); + let uri = socket_path_to_uri(expected_path).unwrap(); + assert_eq!(uri.scheme_str(), Some("unix")); -impl hyper::service::Service for UnixConnector { - type Response = tokio::net::UnixStream; - type Error = Box; - type Future = Pin> + Send>>; + let actual_path = socket_path_from_uri(&uri).unwrap(); + assert_eq!(actual_path.as_path(), Path::new(expected_path)) + } - fn call(&mut self, uri: hyper::Uri) -> Self::Future { - Box::pin(async move { Ok(tokio::net::UnixStream::connect(uri.path()).await?) }) + #[test] + fn test_encode_unix_socket_relative_path() { + let expected_path = "relative/path/to/a/socket.sock".as_ref(); + let uri = socket_path_to_uri(expected_path).unwrap(); + let actual_path = socket_path_from_uri(&uri).unwrap(); + assert_eq!(actual_path.as_path(), Path::new(expected_path)); + + let expected_path = "./relative/path/to/a/socket.sock".as_ref(); + let uri = socket_path_to_uri(expected_path).unwrap(); + let actual_path = socket_path_from_uri(&uri).unwrap(); + assert_eq!(actual_path.as_path(), Path::new(expected_path)); + } + + pin_project! { + #[project = ConnStreamProj] + pub(crate) enum ConnStream { + Tcp{ #[pin] transport: hyper_rustls::MaybeHttpsStream }, + Udp{ #[pin] transport: tokio::net::UnixStream }, + } } +} + +#[cfg(unix)] +use uds::{ConnStream, ConnStreamProj}; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) +#[cfg(not(unix))] +pin_project_lite::pin_project! { + #[project = ConnStreamProj] + pub(crate) enum ConnStream { + Tcp{ #[pin] transport: hyper_rustls::MaybeHttpsStream }, } } @@ -70,14 +98,6 @@ impl Connector { } } -pin_project! { - #[project = ConnStreamProj] - pub(crate) enum ConnStream { - Tcp{ #[pin] transport: hyper_rustls::MaybeHttpsStream }, - Udp{ #[pin] transport: tokio::net::UnixStream }, - } -} - impl tokio::io::AsyncRead for ConnStream { fn poll_read( self: Pin<&mut Self>, @@ -86,6 +106,7 @@ impl tokio::io::AsyncRead for ConnStream { ) -> Poll> { match self.project() { ConnStreamProj::Tcp { transport } => transport.poll_read(cx, buf), + #[cfg(unix)] ConnStreamProj::Udp { transport } => transport.poll_read(cx, buf), } } @@ -95,6 +116,7 @@ impl hyper::client::connect::Connection for ConnStream { fn connected(&self) -> hyper::client::connect::Connected { match self { Self::Tcp { transport } => transport.connected(), + #[cfg(unix)] Self::Udp { transport: _ } => hyper::client::connect::Connected::new(), } } @@ -108,6 +130,7 @@ impl tokio::io::AsyncWrite for ConnStream { ) -> Poll> { match self.project() { ConnStreamProj::Tcp { transport } => transport.poll_write(cx, buf), + #[cfg(unix)] ConnStreamProj::Udp { transport } => transport.poll_write(cx, buf), } } @@ -118,6 +141,7 @@ impl tokio::io::AsyncWrite for ConnStream { ) -> Poll> { match self.project() { ConnStreamProj::Tcp { transport } => transport.poll_shutdown(cx), + #[cfg(unix)] ConnStreamProj::Udp { transport } => transport.poll_shutdown(cx), } } @@ -125,6 +149,7 @@ impl tokio::io::AsyncWrite for ConnStream { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.project() { ConnStreamProj::Tcp { transport } => transport.poll_flush(cx), + #[cfg(unix)] ConnStreamProj::Udp { transport } => transport.poll_flush(cx), } } @@ -138,10 +163,17 @@ impl hyper::service::Service for Connector { fn call(&mut self, uri: hyper::Uri) -> Self::Future { match uri.scheme_str() { Some("unix") => Box::pin(async move { - let path = socket_path_from_uri(&uri)?; - Ok(ConnStream::Udp { - transport: tokio::net::UnixStream::connect(path).await?, - }) + #[cfg(unix)] + { + let path = uds::socket_path_from_uri(&uri)?; + Ok(ConnStream::Udp { + transport: tokio::net::UnixStream::connect(path).await?, + }) + } + #[cfg(not(unix))] + { + Err(crate::errors::Error::UnixSockeUnsuported.into()) + } }), _ => { let fut = self.tcp.call(uri); @@ -161,8 +193,6 @@ impl hyper::service::Service for Connector { #[cfg(test)] mod tests { - use std::path::Path; - use super::*; #[test] @@ -171,30 +201,4 @@ mod tests { fn test_hyper_client_from_connector() { let _: hyper::Client = hyper::Client::builder().build(Connector::new()); } - - #[test] - fn test_encode_unix_socket_path_absolute() { - let expected_path = "/path/to/a/socket.sock"; - let uri = socket_path_to_uri(expected_path).unwrap(); - assert_eq!(uri.scheme_str(), Some("unix")); - - let actual_path = socket_path_from_uri(&uri).unwrap(); - assert_eq!(actual_path.as_path(), Path::new(expected_path)) - } - - #[test] - fn test_encode_unix_socket_relative_path() { - let expected_path = "relative/path/to/a/socket.sock"; - let uri = socket_path_to_uri(expected_path).unwrap(); - let actual_path = socket_path_from_uri(&uri).unwrap(); - assert_eq!(actual_path.as_path(), Path::new(expected_path)); - - let expected_path = "./relative/path/to/a/socket.sock"; - let uri = socket_path_to_uri(expected_path).unwrap(); - let actual_path = socket_path_from_uri(&uri).unwrap(); - assert_eq!(actual_path.as_path(), Path::new(expected_path)); - } } - - - diff --git a/ddprof-exporter/src/errors.rs b/ddprof-exporter/src/errors.rs index d409986..3621810 100644 --- a/ddprof-exporter/src/errors.rs +++ b/ddprof-exporter/src/errors.rs @@ -2,9 +2,11 @@ use std::error; use std::fmt; #[derive(Clone, Debug)] +#[allow(dead_code)] pub(crate) enum Error { InvalidUrl, OperationTimedOut, + UnixSockeUnsuported } impl fmt::Display for Error { @@ -12,6 +14,7 @@ impl fmt::Display for Error { f.write_str(match self { Self::InvalidUrl => "invalid url", Self::OperationTimedOut => "operation timed out", + Self::UnixSockeUnsuported => "unix sockets unsuported on windows" }) } } diff --git a/ddprof-exporter/src/lib.rs b/ddprof-exporter/src/lib.rs index cee129d..7c08adb 100644 --- a/ddprof-exporter/src/lib.rs +++ b/ddprof-exporter/src/lib.rs @@ -15,7 +15,8 @@ mod container_id; mod errors; mod multipart; -pub use connector::socket_path_to_uri; +#[cfg(unix)] +pub use connector::uds::socket_path_to_uri; const DURATION_ZERO: std::time::Duration = std::time::Duration::from_millis(0); const DATADOG_CONTAINER_ID_HEADER: &str = "Datadog-Container-ID"; @@ -129,6 +130,16 @@ impl Endpoint { Ok(Endpoint { url, api_key: None }) } + /// Creates an Endpoint for talking to the Datadog agent though a unix socket. + /// + /// # Arguments + /// * `socket_path` - file system path to the socket + #[cfg(unix)] + pub fn agent_uds(path: &std::path::Path) -> Result> { + let base_url = socket_path_to_uri(path)?; + Self::agent(base_url) + } + /// Creates an Endpoint for talking to Datadog intake without using the agent. /// This is an experimental feature. /// diff --git a/ddprof-ffi/src/exporter.rs b/ddprof-ffi/src/exporter.rs index 148c41b..99e85a1 100644 --- a/ddprof-ffi/src/exporter.rs +++ b/ddprof-ffi/src/exporter.rs @@ -232,8 +232,9 @@ fn try_to_tags(tags: Slice) -> Result, Box Result> { let str: &str = slice.try_into()?; + #[cfg(unix)] if let Some(path) = str.strip_prefix("unix://") { - return Ok(ddprof_exporter::socket_path_to_uri(path)?); + return Ok(ddprof_exporter::socket_path_to_uri(path.as_ref())?); } match hyper::Uri::from_str(str) { Ok(url) => Ok(url), From 56e4f4620cc3944f7877b8203dc27aeb27fc2e89 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Wed, 9 Mar 2022 14:22:32 +0100 Subject: [PATCH 11/14] Switch the multipart implementation to hyper-multipart-rfc7578 --- Cargo.lock | 48 ++ ddprof-exporter/Cargo.toml | 1 + ddprof-exporter/src/lib.rs | 42 +- ddprof-exporter/src/multipart.rs | 745 ------------------------------- 4 files changed, 66 insertions(+), 770 deletions(-) delete mode 100644 ddprof-exporter/src/multipart.rs diff --git a/Cargo.lock b/Cargo.lock index 5b14b3d..724ba88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -72,6 +72,21 @@ dependencies = [ "winapi", ] +[[package]] +name = "common-multipart-rfc7578" +version = "0.4.2" +source = "git+https://github.com/paullegranddc/rust-multipart-rfc7578.git?rev=0e0812e0241601a46986c062d1c4ba8574f437a5#0e0812e0241601a46986c062d1c4ba8574f437a5" +dependencies = [ + "bytes", + "futures-core", + "futures-util", + "http", + "mime", + "mime_guess", + "rand", + "thiserror", +] + [[package]] name = "core-foundation" version = "0.9.3" @@ -109,6 +124,7 @@ dependencies = [ "http", "http-body", "hyper", + "hyper-multipart-rfc7578", "hyper-rustls", "lazy_static", "libc", @@ -343,6 +359,18 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-multipart-rfc7578" +version = "0.6.2" +source = "git+https://github.com/paullegranddc/rust-multipart-rfc7578.git?rev=0e0812e0241601a46986c062d1c4ba8574f437a5#0e0812e0241601a46986c062d1c4ba8574f437a5" +dependencies = [ + "bytes", + "common-multipart-rfc7578", + "futures-core", + "http", + "hyper", +] + [[package]] name = "hyper-rustls" version = "0.23.0" @@ -835,6 +863,26 @@ dependencies = [ "winapi", ] +[[package]] +name = "thiserror" +version = "1.0.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "time" version = "0.1.43" diff --git a/ddprof-exporter/Cargo.toml b/ddprof-exporter/Cargo.toml index fa65f17..a47a039 100644 --- a/ddprof-exporter/Cargo.toml +++ b/ddprof-exporter/Cargo.toml @@ -28,6 +28,7 @@ http-body = "0.4" pin-project-lite = "0.2.0" hyper-rustls = { version = "0.23", default-features = false, features = ["native-tokio", "http1"] } hex = "0.4" +hyper-multipart-rfc7578 = { git = "https://github.com/paullegranddc/rust-multipart-rfc7578.git", rev = "0e0812e0241601a46986c062d1c4ba8574f437a5" } [dev-dependencies] maplit = "1.0" diff --git a/ddprof-exporter/src/lib.rs b/ddprof-exporter/src/lib.rs index 7c08adb..599e72c 100644 --- a/ddprof-exporter/src/lib.rs +++ b/ddprof-exporter/src/lib.rs @@ -3,17 +3,18 @@ use std::borrow::Cow; use std::error::Error; +use std::io::Cursor; use std::str::FromStr; use bytes::Bytes; use hyper::header::HeaderValue; use hyper::Uri; +use hyper_multipart_rfc7578::client::multipart; use tokio::runtime::Runtime; mod connector; mod container_id; mod errors; -mod multipart; #[cfg(unix)] pub use connector::uds::socket_path_to_uri; @@ -178,39 +179,30 @@ impl ProfileExporterV3 { files: &[File], timeout: std::time::Duration, ) -> Result> { - let mut form = multipart::Form::new() - .text("version", "3") - .text("start", start.format("%Y-%m-%dT%H:%M:%SZ").to_string()) - .text("end", end.format("%Y-%m-%dT%H:%M:%SZ").to_string()) - .text("family", String::from(&self.family)); + let mut form = multipart::Form::default(); + + form.add_text("version", "3"); + form.add_text("start", start.format("%Y-%m-%dT%H:%M:%SZ").to_string()); + form.add_text("end", end.format("%Y-%m-%dT%H:%M:%SZ").to_string()); + form.add_text("family", String::from(&self.family)); for tag in self.tags.iter() { - form = form.text("tags[]", format!("{}:{}", tag.name, tag.value)); + form.add_text("tags[]", format!("{}:{}", tag.name, tag.value)); } - form = files.iter().fold(form, |form, file| -> multipart::Form { - let filename = file.name.to_owned(); - let bytes = multipart::Part::bytes(file.bytes.to_owned()) - .file_name(filename.clone()) - .mime_str("application/octet-stream") - .expect("mime to be valid"); - - form.part(format!("data[{}]", filename), bytes) - }); + for file in files { + form.add_reader( + format!("data[{}]", file.name), + Cursor::new(file.bytes.to_owned()), + ) + } let mut builder = hyper::Request::builder() .method(http::Method::POST) .uri(self.endpoint.url.clone()) .header("User-Agent", concat!("DDProf/", env!("CARGO_PKG_VERSION"))) - .header("Connection", "close") - .header( - "Content-type", - format!("multipart/form-data; boundary={}", form.boundary()).as_str(), - ); + .header("Connection", "close"); - if let Some(length) = form.compute_length() { - builder = builder.header("Content-Length", length); - } if let Some(api_key) = &self.endpoint.api_key { builder = builder.header( "DD-API-KEY", @@ -223,7 +215,7 @@ impl ProfileExporterV3 { } Ok( - Request::from(builder.body(hyper::Body::wrap_stream(form.stream()))?) + Request::from(form.set_body_convert::(builder)?) .with_timeout(timeout), ) } diff --git a/ddprof-exporter/src/multipart.rs b/ddprof-exporter/src/multipart.rs deleted file mode 100644 index 4690a3c..0000000 --- a/ddprof-exporter/src/multipart.rs +++ /dev/null @@ -1,745 +0,0 @@ -use std::borrow::Cow; -use std::fmt; -use std::pin::Pin; -use std::str::FromStr; -use std::task::{Context, Poll}; - -use bytes::Bytes; -use futures::TryStreamExt; -use futures_core::Stream; -use futures_util::{future, stream, StreamExt}; -use http::HeaderMap; -use http_body::Body as HttpBody; -use mime_guess::Mime; -use percent_encoding::{self, AsciiSet, NON_ALPHANUMERIC}; - -// ==== Type adapters to remove the dependency on reqwest ==== - -type FormResult = std::result::Result>; - -pin_project_lite::pin_project! { - #[derive(Debug)] - pub (crate) struct FormBody { - #[pin] - inner: hyper::Body - } -} - -impl FormBody { - fn content_length(&self) -> Option { - ::size_hint(&self.inner).exact() - } -} - -impl Stream for FormBody { - type Item = std::result::Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.project().inner.poll_data(cx) - } -} - -impl From for FormBody { - fn from(inner: hyper::Body) -> Self { - Self { inner } - } -} - -// ==== Original code from https://github.com/seanmonstar/reqwest/blob/master/src/async_impl/multipart.rs ==== - -/// An async multipart/form-data request. -pub struct Form { - inner: FormParts, -} - -/// A field in a multipart form. -pub struct Part { - meta: PartMetadata, - value: FormBody, - body_length: Option, -} - -pub(crate) struct FormParts

{ - pub(crate) boundary: String, - pub(crate) computed_headers: Vec>, - pub(crate) fields: Vec<(Cow<'static, str>, P)>, - pub(crate) percent_encoding: PercentEncoding, -} - -pub(crate) struct PartMetadata { - mime: Option, - file_name: Option>, - pub(crate) headers: HeaderMap, -} - -pub(crate) trait PartProps { - fn value_len(&self) -> Option; - fn metadata(&self) -> &PartMetadata; -} - -// ===== impl Form ===== - -impl Default for Form { - fn default() -> Self { - Self::new() - } -} - -impl Form { - /// Creates a new async Form without any content. - pub fn new() -> Form { - Form { - inner: FormParts::new(), - } - } - - /// Get the boundary that this form will use. - #[inline] - pub fn boundary(&self) -> &str { - self.inner.boundary() - } - - /// Add a data field with supplied name and value. - /// - /// # Examples - /// - /// ```ignore - /// let form = multipart::Form::new() - /// .text("username", "seanmonstar") - /// .text("password", "secret"); - /// ``` - pub fn text(self, name: T, value: U) -> Form - where - T: Into>, - U: Into>, - { - self.part(name, Part::text(value)) - } - - /// Adds a customized Part. - pub fn part(self, name: T, part: Part) -> Form - where - T: Into>, - { - self.with_inner(move |inner| inner.part(name, part)) - } - - #[allow(dead_code)] - /// Configure this `Form` to percent-encode using the `path-segment` rules. - pub fn percent_encode_path_segment(self) -> Form { - self.with_inner(|inner| inner.percent_encode_path_segment()) - } - - #[allow(dead_code)] - /// Configure this `Form` to percent-encode using the `attr-char` rules. - pub fn percent_encode_attr_chars(self) -> Form { - self.with_inner(|inner| inner.percent_encode_attr_chars()) - } - - #[allow(dead_code)] - /// Configure this `Form` to skip percent-encoding - pub fn percent_encode_noop(self) -> Form { - self.with_inner(|inner| inner.percent_encode_noop()) - } - - /// Consume this instance and transform into an instance of Body for use in a request. - pub(crate) fn stream(mut self) -> FormBody { - if self.inner.fields.is_empty() { - return FormBody { - inner: hyper::Body::empty(), - }; - } - - // create initial part to init reduce chain - let (name, part) = self.inner.fields.remove(0); - let start = Box::pin(self.part_stream(name, part)) - as Pin> + Send + Sync>>; - - let fields = self.inner.take_fields(); - // for each field, chain an additional stream - let stream = fields.into_iter().fold(start, |memo, (name, part)| { - let part_stream = self.part_stream(name, part); - Box::pin(memo.chain(part_stream)) - as Pin> + Send + Sync>> - }); - // append special ending boundary - let last = stream::once(future::ready(Ok( - format!("--{}--\r\n", self.boundary()).into() - ))); - FormBody { - inner: hyper::Body::wrap_stream(stream.chain(last)), - } - } - - /// Generate a hyper::Body stream for a single Part instance of a Form request. - pub(crate) fn part_stream( - &mut self, - name: T, - part: Part, - ) -> impl Stream> - where - T: Into>, - { - // start with boundary - let boundary = stream::once(future::ready(Ok( - format!("--{}\r\n", self.boundary()).into() - ))); - // append headers - let header = stream::once(future::ready(Ok({ - let mut h = self - .inner - .percent_encoding - .encode_headers(&name.into(), &part.meta); - h.extend_from_slice(b"\r\n\r\n"); - h.into() - }))); - // then append form data followed by terminating CRLF - boundary - .chain(header) - .chain(part.value.map_err(|e| e.into())) - .chain(stream::once(future::ready(Ok("\r\n".into())))) - } - - pub(crate) fn compute_length(&mut self) -> Option { - self.inner.compute_length() - } - - fn with_inner(self, func: F) -> Self - where - F: FnOnce(FormParts) -> FormParts, - { - Form { - inner: func(self.inner), - } - } -} - -impl fmt::Debug for Form { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - self.inner.fmt_fields("Form", f) - } -} - -// ===== impl Part ===== - -impl Part { - /// Makes a text parameter. - pub fn text(value: T) -> Part - where - T: Into>, - { - let body = match value.into() { - Cow::Borrowed(slice) => hyper::Body::from(slice).into(), - Cow::Owned(string) => hyper::Body::from(string).into(), - }; - Part::new(body, None) - } - - /// Makes a new parameter from arbitrary bytes. - pub fn bytes(value: T) -> Part - where - T: Into>, - { - let body = match value.into() { - Cow::Borrowed(slice) => hyper::Body::from(slice).into(), - Cow::Owned(vec) => hyper::Body::from(vec).into(), - }; - Part::new(body, None) - } - - #[allow(dead_code)] - /// Makes a new parameter from an arbitrary stream. - pub(crate) fn stream>(value: T) -> Part { - Part::new(value.into(), None) - } - - #[allow(dead_code)] - /// Makes a new parameter from an arbitrary stream with a known length. This is particularly - /// useful when adding something like file contents as a stream, where you can know the content - /// length beforehand. - pub(crate) fn stream_with_length>(value: T, length: u64) -> Part { - Part::new(value.into(), Some(length)) - } - - fn new(value: FormBody, body_length: Option) -> Part { - Part { - meta: PartMetadata::new(), - value, - body_length, - } - } - - /// Tries to set the mime of this part. - pub fn mime_str(self, mime: &str) -> FormResult { - Ok(self.mime(Mime::from_str(mime).map_err(Box::new)?)) - } - - // Re-export when mime 0.4 is available, with split MediaType/MediaRange. - fn mime(self, mime: Mime) -> Part { - self.with_inner(move |inner| inner.mime(mime)) - } - - /// Sets the filename, builder style. - pub fn file_name(self, filename: T) -> Part - where - T: Into>, - { - self.with_inner(move |inner| inner.file_name(filename)) - } - - fn with_inner(self, func: F) -> Self - where - F: FnOnce(PartMetadata) -> PartMetadata, - { - Part { - meta: func(self.meta), - ..self - } - } -} - -impl fmt::Debug for Part { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let mut dbg = f.debug_struct("Part"); - dbg.field("value", &self.value); - self.meta.fmt_fields(&mut dbg); - dbg.finish() - } -} - -impl PartProps for Part { - fn value_len(&self) -> Option { - if self.body_length.is_some() { - self.body_length - } else { - self.value.content_length() - } - } - - fn metadata(&self) -> &PartMetadata { - &self.meta - } -} - -// ===== impl FormParts ===== - -impl FormParts

{ - pub(crate) fn new() -> Self { - FormParts { - boundary: gen_boundary(), - computed_headers: Vec::new(), - fields: Vec::new(), - percent_encoding: PercentEncoding::PathSegment, - } - } - - pub(crate) fn boundary(&self) -> &str { - &self.boundary - } - - /// Adds a customized Part. - pub(crate) fn part(mut self, name: T, part: P) -> Self - where - T: Into>, - { - self.fields.push((name.into(), part)); - self - } - - #[allow(dead_code)] - /// Configure this `Form` to percent-encode using the `path-segment` rules. - pub(crate) fn percent_encode_path_segment(mut self) -> Self { - self.percent_encoding = PercentEncoding::PathSegment; - self - } - - #[allow(dead_code)] - /// Configure this `Form` to percent-encode using the `attr-char` rules. - pub(crate) fn percent_encode_attr_chars(mut self) -> Self { - self.percent_encoding = PercentEncoding::AttrChar; - self - } - - #[allow(dead_code)] - /// Configure this `Form` to skip percent-encoding - pub(crate) fn percent_encode_noop(mut self) -> Self { - self.percent_encoding = PercentEncoding::NoOp; - self - } - - // If predictable, computes the length the request will have - // The length should be preditable if only String and file fields have been added, - // but not if a generic reader has been added; - pub(crate) fn compute_length(&mut self) -> Option { - let mut length = 0u64; - for &(ref name, ref field) in self.fields.iter() { - match field.value_len() { - Some(value_length) => { - // We are constructing the header just to get its length. To not have to - // construct it again when the request is sent we cache these headers. - let header = self.percent_encoding.encode_headers(name, field.metadata()); - let header_length = header.len(); - self.computed_headers.push(header); - // The additions mimic the format string out of which the field is constructed - // in Reader. Not the cleanest solution because if that format string is - // ever changed then this formula needs to be changed too which is not an - // obvious dependency in the code. - length += 2 - + self.boundary().len() as u64 - + 2 - + header_length as u64 - + 4 - + value_length - + 2 - } - _ => return None, - } - } - // If there is a at least one field there is a special boundary for the very last field. - if !self.fields.is_empty() { - length += 2 + self.boundary().len() as u64 + 4 - } - Some(length) - } - - /// Take the fields vector of this instance, replacing with an empty vector. - fn take_fields(&mut self) -> Vec<(Cow<'static, str>, P)> { - std::mem::replace(&mut self.fields, Vec::new()) - } -} - -impl FormParts

{ - pub(crate) fn fmt_fields(&self, ty_name: &'static str, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct(ty_name) - .field("boundary", &self.boundary) - .field("parts", &self.fields) - .finish() - } -} - -// ===== impl PartMetadata ===== - -impl PartMetadata { - pub(crate) fn new() -> Self { - PartMetadata { - mime: None, - file_name: None, - headers: HeaderMap::default(), - } - } - - pub(crate) fn mime(mut self, mime: Mime) -> Self { - self.mime = Some(mime); - self - } - - pub(crate) fn file_name(mut self, filename: T) -> Self - where - T: Into>, - { - self.file_name = Some(filename.into()); - self - } -} - -impl PartMetadata { - pub(crate) fn fmt_fields<'f, 'fa, 'fb>( - &self, - debug_struct: &'f mut fmt::DebugStruct<'fa, 'fb>, - ) -> &'f mut fmt::DebugStruct<'fa, 'fb> { - debug_struct - .field("mime", &self.mime) - .field("file_name", &self.file_name) - .field("headers", &self.headers) - } -} - -// https://url.spec.whatwg.org/#fragment-percent-encode-set -const FRAGMENT_ENCODE_SET: &AsciiSet = &percent_encoding::CONTROLS - .add(b' ') - .add(b'"') - .add(b'<') - .add(b'>') - .add(b'`'); - -// https://url.spec.whatwg.org/#path-percent-encode-set -const PATH_ENCODE_SET: &AsciiSet = &FRAGMENT_ENCODE_SET.add(b'#').add(b'?').add(b'{').add(b'}'); - -const PATH_SEGMENT_ENCODE_SET: &AsciiSet = &PATH_ENCODE_SET.add(b'/').add(b'%'); - -#[allow(dead_code)] -// https://tools.ietf.org/html/rfc8187#section-3.2.1 -const ATTR_CHAR_ENCODE_SET: &AsciiSet = &NON_ALPHANUMERIC - .remove(b'!') - .remove(b'#') - .remove(b'$') - .remove(b'&') - .remove(b'+') - .remove(b'-') - .remove(b'.') - .remove(b'^') - .remove(b'_') - .remove(b'`') - .remove(b'|') - .remove(b'~'); - -#[allow(dead_code)] -pub(crate) enum PercentEncoding { - PathSegment, - AttrChar, - NoOp, -} - -impl PercentEncoding { - pub(crate) fn encode_headers(&self, name: &str, field: &PartMetadata) -> Vec { - let s = format!( - "Content-Disposition: form-data; {}{}{}", - self.format_parameter("name", name), - match field.file_name { - Some(ref file_name) => format!("; {}", self.format_filename(file_name)), - None => String::new(), - }, - match field.mime { - Some(ref mime) => format!("\r\nContent-Type: {}", mime), - None => "".to_string(), - }, - ); - field - .headers - .iter() - .fold(s.into_bytes(), |mut header, (k, v)| { - header.extend_from_slice(b"\r\n"); - header.extend_from_slice(k.as_str().as_bytes()); - header.extend_from_slice(b": "); - header.extend_from_slice(v.as_bytes()); - header - }) - } - - // According to RFC7578 Section 4.2, `filename*=` syntax is invalid. - // See https://github.com/seanmonstar/reqwest/issues/419. - fn format_filename(&self, filename: &str) -> String { - let legal_filename = filename - .replace("\\", "\\\\") - .replace("\"", "\\\"") - .replace("\r", "\\\r") - .replace("\n", "\\\n"); - format!("filename=\"{}\"", legal_filename) - } - - fn format_parameter(&self, name: &str, value: &str) -> String { - let legal_value = match *self { - PercentEncoding::PathSegment => { - percent_encoding::utf8_percent_encode(value, PATH_SEGMENT_ENCODE_SET).to_string() - } - PercentEncoding::AttrChar => { - percent_encoding::utf8_percent_encode(value, ATTR_CHAR_ENCODE_SET).to_string() - } - PercentEncoding::NoOp => value.to_string(), - }; - if value.len() == legal_value.len() { - // nothing has been percent encoded - format!("{}=\"{}\"", name, value) - } else { - // something has been percent encoded - format!("{}*=utf-8''{}", name, legal_value) - } - } -} - -fn fast_random() -> u64 { - use std::cell::Cell; - use std::collections::hash_map::RandomState; - use std::hash::{BuildHasher, Hasher}; - use std::num::Wrapping; - - thread_local! { - static RNG: Cell> = Cell::new(Wrapping(seed())); - } - - fn seed() -> u64 { - let seed = RandomState::new(); - - let mut out = 0; - let mut cnt = 0; - while out == 0 { - cnt += 1; - let mut hasher = seed.build_hasher(); - hasher.write_usize(cnt); - out = hasher.finish(); - } - out - } - - RNG.with(|rng| { - let mut n = rng.get(); - debug_assert_ne!(n.0, 0); - n ^= n >> 12; - n ^= n << 25; - n ^= n >> 27; - rng.set(n); - n.0.wrapping_mul(0x2545_f491_4f6c_dd1d) - }) -} - -fn gen_boundary() -> String { - let a = fast_random(); - let b = fast_random(); - let c = fast_random(); - let d = fast_random(); - - format!("{:016x}-{:016x}-{:016x}-{:016x}", a, b, c, d) -} - -#[cfg(test)] -mod tests { - use super::*; - use futures_util::TryStreamExt; - use futures_util::{future, stream}; - use tokio::{self, runtime}; - - #[test] - fn form_empty() { - let form = Form::new(); - - let rt = runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("new rt"); - let body = form.stream().into_stream(); - let s = body.map_ok(|try_c| try_c.to_vec()).try_concat(); - - let out = rt.block_on(s); - assert!(out.unwrap().is_empty()); - } - - #[test] - fn stream_to_end() { - let mut form = Form::new() - .part( - "reader1", - Part::stream(hyper::Body::wrap_stream(stream::once(future::ready::< - FormResult, - >( - Ok("part1".to_owned()), - )))) - .into(), - ) - .part("key1", Part::text("value1")) - .part( - "key2", - Part::text("value2").mime(mime_guess::mime::IMAGE_BMP), - ) - .part( - "reader2", - Part::stream(hyper::Body::wrap_stream(stream::once(future::ready::< - FormResult, - >( - Ok("part2".to_owned()), - )))), - ) - .part("key3", Part::text("value3").file_name("filename")); - form.inner.boundary = "boundary".to_string(); - let expected = "--boundary\r\n\ - Content-Disposition: form-data; name=\"reader1\"\r\n\r\n\ - part1\r\n\ - --boundary\r\n\ - Content-Disposition: form-data; name=\"key1\"\r\n\r\n\ - value1\r\n\ - --boundary\r\n\ - Content-Disposition: form-data; name=\"key2\"\r\n\ - Content-Type: image/bmp\r\n\r\n\ - value2\r\n\ - --boundary\r\n\ - Content-Disposition: form-data; name=\"reader2\"\r\n\r\n\ - part2\r\n\ - --boundary\r\n\ - Content-Disposition: form-data; name=\"key3\"; filename=\"filename\"\r\n\r\n\ - value3\r\n--boundary--\r\n"; - let rt = runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("new rt"); - let body = form.stream().into_stream(); - let s = body.map(|try_c| try_c.map(|r| r.to_vec())).try_concat(); - - let out = rt.block_on(s).unwrap(); - // These prints are for debug purposes in case the test fails - println!( - "START REAL\n{}\nEND REAL", - std::str::from_utf8(&out).unwrap() - ); - println!("START EXPECTED\n{}\nEND EXPECTED", expected); - assert_eq!(std::str::from_utf8(&out).unwrap(), expected); - } - - #[test] - fn stream_to_end_with_header() { - let mut part = Part::text("value2").mime(mime_guess::mime::IMAGE_BMP); - part.meta.headers.insert("Hdr3", "/a/b/c".parse().unwrap()); - let mut form = Form::new().part("key2", part); - form.inner.boundary = "boundary".to_string(); - let expected = "--boundary\r\n\ - Content-Disposition: form-data; name=\"key2\"\r\n\ - Content-Type: image/bmp\r\n\ - hdr3: /a/b/c\r\n\ - \r\n\ - value2\r\n\ - --boundary--\r\n"; - let rt = runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("new rt"); - let body = form.stream().into_stream(); - let s = body.map(|try_c| try_c.map(|r| r.to_vec())).try_concat(); - - let out = rt.block_on(s).unwrap(); - // These prints are for debug purposes in case the test fails - println!( - "START REAL\n{}\nEND REAL", - std::str::from_utf8(&out).unwrap() - ); - println!("START EXPECTED\n{}\nEND EXPECTED", expected); - assert_eq!(std::str::from_utf8(&out).unwrap(), expected); - } - - #[test] - fn correct_content_length() { - // Setup an arbitrary data stream - let stream_data = b"just some stream data"; - let stream_len = stream_data.len(); - let stream_data = stream_data - .chunks(3) - .map(|c| Ok::<_, std::io::Error>(Bytes::from(c))); - let the_stream = futures_util::stream::iter(stream_data); - - let bytes_data = b"some bytes data".to_vec(); - let bytes_len = bytes_data.len(); - - let stream_part = - Part::stream_with_length(hyper::Body::wrap_stream(the_stream), stream_len as u64); - let body_part = Part::bytes(bytes_data); - - // A simple check to make sure we get the configured body length - assert_eq!(stream_part.value_len().unwrap(), stream_len as u64); - - // Make sure it delegates to the underlying body if length is not specified - assert_eq!(body_part.value_len().unwrap(), bytes_len as u64); - } - - #[test] - fn header_percent_encoding() { - let name = "start%'\"\r\nßend"; - let field = Part::text(""); - - assert_eq!( - PercentEncoding::PathSegment.encode_headers(name, &field.meta), - &b"Content-Disposition: form-data; name*=utf-8''start%25'%22%0D%0A%C3%9Fend"[..] - ); - - assert_eq!( - PercentEncoding::AttrChar.encode_headers(name, &field.meta), - &b"Content-Disposition: form-data; name*=utf-8''start%25%27%22%0D%0A%C3%9Fend"[..] - ); - } -} From 12d795fb676c1a09edb6b3f5ec0e092f4a8a76c3 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Fri, 11 Mar 2022 13:31:42 +0100 Subject: [PATCH 12/14] Enable tls1.2 on rustls --- ddprof-exporter/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddprof-exporter/Cargo.toml b/ddprof-exporter/Cargo.toml index a47a039..b9d754a 100644 --- a/ddprof-exporter/Cargo.toml +++ b/ddprof-exporter/Cargo.toml @@ -26,7 +26,7 @@ futures-util = { version = "0.3.0", default-features = false } mime_guess = { version = "2.0", default-features = false } http-body = "0.4" pin-project-lite = "0.2.0" -hyper-rustls = { version = "0.23", default-features = false, features = ["native-tokio", "http1"] } +hyper-rustls = { version = "0.23", default-features = false, features = ["native-tokio", "http1", "tls12"] } hex = "0.4" hyper-multipart-rfc7578 = { git = "https://github.com/paullegranddc/rust-multipart-rfc7578.git", rev = "0e0812e0241601a46986c062d1c4ba8574f437a5" } From 8a18e2d4313683ffe5bb4dbbc8fa39775a733b4f Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Fri, 11 Mar 2022 13:35:29 +0100 Subject: [PATCH 13/14] update rust-multipart-rfc7578 fork ref --- Cargo.lock | 4 ++-- ddprof-exporter/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 724ba88..d09959e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -75,7 +75,7 @@ dependencies = [ [[package]] name = "common-multipart-rfc7578" version = "0.4.2" -source = "git+https://github.com/paullegranddc/rust-multipart-rfc7578.git?rev=0e0812e0241601a46986c062d1c4ba8574f437a5#0e0812e0241601a46986c062d1c4ba8574f437a5" +source = "git+https://github.com/paullegranddc/rust-multipart-rfc7578.git?rev=8dcedc266e50876c04c91d24390fe9ac44f10b96#8dcedc266e50876c04c91d24390fe9ac44f10b96" dependencies = [ "bytes", "futures-core", @@ -362,7 +362,7 @@ dependencies = [ [[package]] name = "hyper-multipart-rfc7578" version = "0.6.2" -source = "git+https://github.com/paullegranddc/rust-multipart-rfc7578.git?rev=0e0812e0241601a46986c062d1c4ba8574f437a5#0e0812e0241601a46986c062d1c4ba8574f437a5" +source = "git+https://github.com/paullegranddc/rust-multipart-rfc7578.git?rev=8dcedc266e50876c04c91d24390fe9ac44f10b96#8dcedc266e50876c04c91d24390fe9ac44f10b96" dependencies = [ "bytes", "common-multipart-rfc7578", diff --git a/ddprof-exporter/Cargo.toml b/ddprof-exporter/Cargo.toml index b9d754a..55df406 100644 --- a/ddprof-exporter/Cargo.toml +++ b/ddprof-exporter/Cargo.toml @@ -28,7 +28,7 @@ http-body = "0.4" pin-project-lite = "0.2.0" hyper-rustls = { version = "0.23", default-features = false, features = ["native-tokio", "http1", "tls12"] } hex = "0.4" -hyper-multipart-rfc7578 = { git = "https://github.com/paullegranddc/rust-multipart-rfc7578.git", rev = "0e0812e0241601a46986c062d1c4ba8574f437a5" } +hyper-multipart-rfc7578 = { git = "https://github.com/paullegranddc/rust-multipart-rfc7578.git", rev = "8dcedc266e50876c04c91d24390fe9ac44f10b96" } [dev-dependencies] maplit = "1.0" From ae479f64e0ffcaee29cd0617f88e5b4e1be6fd19 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Fri, 11 Mar 2022 14:11:39 +0100 Subject: [PATCH 14/14] Add filenames to form part --- ddprof-exporter/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ddprof-exporter/src/lib.rs b/ddprof-exporter/src/lib.rs index de08fb4..5a3b20b 100644 --- a/ddprof-exporter/src/lib.rs +++ b/ddprof-exporter/src/lib.rs @@ -191,9 +191,10 @@ impl ProfileExporterV3 { } for file in files { - form.add_reader( + form.add_reader_file( format!("data[{}]", file.name), Cursor::new(file.bytes.to_owned()), + file.name, ) }