From 62bd883e7d30eb0a6ba9dd45d24147f0fb692b02 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Sun, 1 Dec 2024 00:00:00 +0000 Subject: [PATCH] feat(http/body): Body middleware bridging http-body 0.4 and 1.0 hyper 1.0 migrated to a new definition of the `Body` trait, which exposes a single `poll_frame(..)` method, rather than the two `poll_data(..)` and `poll_trailers(..)` methods previously required of implementors. this commit introduces a new `linkerd-http-body` crate. this crate allows callers to wrap a "legacy" body that implements the 0.4 version of `http_body::Body` in a middleware to incrementally adopt the new 1.0 interface. see . Signed-off-by: katelyn martin --- Cargo.lock | 163 ++++++++++++++++------------ Cargo.toml | 1 + linkerd/http/body/Cargo.toml | 19 ++++ linkerd/http/body/src/lib.rs | 200 +++++++++++++++++++++++++++++++++++ 4 files changed, 318 insertions(+), 65 deletions(-) create mode 100644 linkerd/http/body/Cargo.toml create mode 100644 linkerd/http/body/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 92ac5a6040..408eeff068 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -164,8 +164,8 @@ dependencies = [ "bitflags 1.3.2", "bytes", "futures-util", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "hyper", "itoa", "matchit", @@ -190,8 +190,8 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "mime", "rustversion", "tower-layer", @@ -806,7 +806,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.11", "indexmap 2.7.0", "slab", "tokio", @@ -911,6 +911,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f16ca2af56261c99fba8bac40a10251ce8188205a4c448fbb745a2e4daa76fea" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -918,10 +929,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.11", "pin-project-lite", ] +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http 1.2.0", +] + [[package]] name = "httparse" version = "1.9.5" @@ -950,8 +971,8 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -968,8 +989,8 @@ name = "hyper-balance" version = "0.1.0" dependencies = [ "futures", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "hyper", "pin-project", "tokio", @@ -1279,7 +1300,7 @@ checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" name = "linkerd-addr" version = "0.1.0" dependencies = [ - "http", + "http 0.2.11", "ipnet", "linkerd-dns-name", "thiserror 2.0.8", @@ -1316,8 +1337,8 @@ dependencies = [ "bytes", "deflate", "futures", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "hyper", "linkerd-app-core", "linkerd-app-inbound", @@ -1338,8 +1359,8 @@ dependencies = [ "bytes", "drain", "futures", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "hyper", "ipnet", "linkerd-addr", @@ -1404,7 +1425,7 @@ name = "linkerd-app-gateway" version = "0.1.0" dependencies = [ "futures", - "http", + "http 0.2.11", "linkerd-app-core", "linkerd-app-inbound", "linkerd-app-outbound", @@ -1428,7 +1449,7 @@ dependencies = [ "arbitrary", "bytes", "futures", - "http", + "http 0.2.11", "hyper", "libfuzzer-sys", "linkerd-app-core", @@ -1464,8 +1485,8 @@ dependencies = [ "flate2", "futures", "h2", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "hyper", "ipnet", "linkerd-app", @@ -1499,8 +1520,8 @@ dependencies = [ "bytes", "futures", "futures-util", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "hyper", "linkerd-app-core", "linkerd-app-test", @@ -1542,8 +1563,8 @@ version = "0.1.0" dependencies = [ "futures", "h2", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "hyper", "linkerd-app-core", "linkerd-http-route", @@ -1668,7 +1689,7 @@ name = "linkerd-http-access-log" version = "0.1.0" dependencies = [ "futures-core", - "http", + "http 0.2.11", "humantime", "linkerd-identity", "linkerd-proxy-transport", @@ -1680,14 +1701,26 @@ dependencies = [ "tracing", ] +[[package]] +name = "linkerd-http-body" +version = "0.1.0" +dependencies = [ + "bytes", + "http 0.2.11", + "http 1.2.0", + "http-body 0.4.6", + "http-body 1.0.1", + "pin-project", +] + [[package]] name = "linkerd-http-box" version = "0.1.0" dependencies = [ "bytes", "futures", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "linkerd-error", "linkerd-stack", "pin-project", @@ -1698,8 +1731,8 @@ name = "linkerd-http-classify" version = "0.1.0" dependencies = [ "futures", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "linkerd-error", "linkerd-http-box", "linkerd-stack", @@ -1729,7 +1762,7 @@ name = "linkerd-http-insert" version = "0.1.0" dependencies = [ "futures", - "http", + "http 0.2.11", "linkerd-stack", "pin-project", "tower", @@ -1741,8 +1774,8 @@ version = "0.1.0" dependencies = [ "bytes", "futures", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "hyper", "linkerd-error", "linkerd-http-classify", @@ -1759,7 +1792,7 @@ dependencies = [ name = "linkerd-http-override-authority" version = "0.1.0" dependencies = [ - "http", + "http 0.2.11", "linkerd-stack", "tower", "tracing", @@ -1771,8 +1804,8 @@ version = "0.1.0" dependencies = [ "bytes", "futures", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "linkerd-error", "linkerd-http-box", "linkerd-metrics", @@ -1788,8 +1821,8 @@ dependencies = [ name = "linkerd-http-retain" version = "0.1.0" dependencies = [ - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "linkerd-stack", "pin-project", "tower", @@ -1801,8 +1834,8 @@ version = "0.1.0" dependencies = [ "bytes", "futures", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "hyper", "linkerd-error", "linkerd-exp-backoff", @@ -1821,7 +1854,7 @@ dependencies = [ name = "linkerd-http-route" version = "0.1.0" dependencies = [ - "http", + "http 0.2.11", "linkerd2-proxy-api", "maplit", "rand", @@ -1836,8 +1869,8 @@ name = "linkerd-http-stream-timeouts" version = "0.1.0" dependencies = [ "futures", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "linkerd-error", "linkerd-stack", "parking_lot", @@ -1854,8 +1887,8 @@ dependencies = [ "bytes", "drain", "futures", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "hyper", "linkerd-duplex", "linkerd-error", @@ -1873,7 +1906,7 @@ dependencies = [ name = "linkerd-http-version" version = "0.1.0" dependencies = [ - "http", + "http 0.2.11", "thiserror 2.0.8", ] @@ -2000,8 +2033,8 @@ name = "linkerd-metrics" version = "0.1.0" dependencies = [ "deflate", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "hyper", "linkerd-http-box", "linkerd-stack", @@ -2022,8 +2055,8 @@ name = "linkerd-opencensus" version = "0.1.0" dependencies = [ "futures", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "linkerd-error", "linkerd-metrics", "linkerd-trace-context", @@ -2039,8 +2072,8 @@ name = "linkerd-opentelemetry" version = "0.1.0" dependencies = [ "futures", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "linkerd-error", "linkerd-metrics", "linkerd-trace-context", @@ -2103,8 +2136,8 @@ name = "linkerd-proxy-api-resolve" version = "0.1.0" dependencies = [ "futures", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "linkerd-addr", "linkerd-error", "linkerd-http-h2", @@ -2176,7 +2209,7 @@ name = "linkerd-proxy-client-policy" version = "0.1.0" dependencies = [ "ahash", - "http", + "http 0.2.11", "ipnet", "linkerd-error", "linkerd-exp-backoff", @@ -2228,8 +2261,8 @@ dependencies = [ "drain", "futures", "h2", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "httparse", "hyper", "hyper-balance", @@ -2267,7 +2300,7 @@ name = "linkerd-proxy-identity-client" version = "0.1.0" dependencies = [ "futures", - "http-body", + "http-body 0.4.6", "linkerd-dns-name", "linkerd-error", "linkerd-identity", @@ -2300,7 +2333,7 @@ name = "linkerd-proxy-server-policy" version = "0.1.0" dependencies = [ "governor", - "http", + "http 0.2.11", "ipnet", "linkerd-http-route", "linkerd-identity", @@ -2341,8 +2374,8 @@ version = "0.1.0" dependencies = [ "bytes", "futures", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "hyper", "ipnet", "linkerd-conditional", @@ -2443,8 +2476,8 @@ version = "0.1.0" dependencies = [ "bytes", "futures", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "linkerd-addr", "linkerd-dns-name", "linkerd-error", @@ -2602,7 +2635,7 @@ dependencies = [ "bytes", "futures", "hex", - "http", + "http 0.2.11", "linkerd-error", "linkerd-stack", "rand", @@ -2682,7 +2715,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4682c00263191a5bfa4fbe64f6d80b22ff2b49aaa294da5aac062f5abc6eb9e" dependencies = [ "h2", - "http", + "http 0.2.11", "ipnet", "prost", "prost-types", @@ -3928,8 +3961,8 @@ dependencies = [ "base64 0.21.7", "bytes", "h2", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "hyper", "hyper-timeout", "percent-encoding", diff --git a/Cargo.toml b/Cargo.toml index 49d05c6e56..2d21e73ebe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ members = [ "linkerd/error-respond", "linkerd/exp-backoff", "linkerd/http/access-log", + "linkerd/http/body", "linkerd/http/box", "linkerd/http/classify", "linkerd/http/executor", diff --git a/linkerd/http/body/Cargo.toml b/linkerd/http/body/Cargo.toml new file mode 100644 index 0000000000..de9cd666b6 --- /dev/null +++ b/linkerd/http/body/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "linkerd-http-body" +version = "0.1.0" +authors = ["Linkerd Developers "] +license = "Apache-2.0" +edition = "2021" +publish = false + +[dependencies] +bytes = "1.6" +pin-project = "1" + +# Legacy HTTP/Hyper dependencies. +http-0-2 = { package = "http", version = "0.2" } +http-body-0-4 = { package = "http-body", version = "0.4" } + +# 1.0 HTTP/Hyper dependencies. +http-1-2 = { package = "http", version = "1.2" } +http-body-1-0 = { package = "http-body", version = "1.0" } diff --git a/linkerd/http/body/src/lib.rs b/linkerd/http/body/src/lib.rs new file mode 100644 index 0000000000..c436669187 --- /dev/null +++ b/linkerd/http/body/src/lib.rs @@ -0,0 +1,200 @@ +//! Asynchronous request or response body types. + +use pin_project::pin_project; +use std::{ + pin::Pin, + task::{ready, Context, Poll}, +}; + +/// Reƫxports the legacy interfaces of the [`http-body`] crate. +pub mod legacy { + pub use http_0_2 as http; + pub use http_body_0_4 as http_body; +} + +/// Reƫxports the v1.0 interfaces of the [`http-body`] crate. +pub mod v1_0 { + pub use http_1_2 as http; + pub use http_body_1_0 as http_body; +} + +/// Compatibility bridges between new and legacy crates. +mod compat { + use {crate::legacy::http as http_legacy, crate::v1_0::http as http_v1}; + + /// Returns an upgraded [`HeaderMap`][http_legacy::HeaderMap]. + pub(super) fn headers(mut legacy: http_legacy::HeaderMap) -> http_v1::HeaderMap { + let mut out = http_v1::HeaderMap::with_capacity(legacy.len()); + + // Iterate through the header map. + // + // NB: A `None` header name means that this is a value associated with the last previously + // seen header name. Because we must hang onto this value, cloning it when appending to + // the next entry, mark this as allowing for "unused" assignments. + #[allow(unused_assignments)] + let mut name = None; + for (mut curr, value) in legacy.drain() { + // Cache this as our last seen name, if this item has a name associated. + name = curr.take().map(header_name); + let value = header_value(value); + // Append this name/value pair to our upgraded header map. + out.append(name.clone().unwrap(), value); + } + + out + } + + fn header_name(legacy: http_legacy::HeaderName) -> http_v1::HeaderName { + let bytes = legacy.as_str().as_bytes(); + http_v1::HeaderName::from_bytes(bytes).unwrap() + } + + fn header_value(legacy: http_legacy::HeaderValue) -> http_v1::HeaderValue { + let bytes = legacy.as_bytes(); + http_v1::HeaderValue::from_bytes(bytes).unwrap() + } +} + +/// Wraps a legacy v0.4 asynchronous request or response body. +#[pin_project(project = LegacyHyperAdaptorBodyPinned)] +pub struct LegacyHyperAdaptorBody { + #[pin] + inner: B, + /// True if the inner body's data has been polled. + data_finished: bool, + /// True if the inner body's stream has been completed. + stream_finished: bool, +} + +// === impl LegacyHyperAdaptorBody === + +impl LegacyHyperAdaptorBody +where + B: crate::legacy::http_body::Body, +{ + /// Returns a legacy adaptor body. + pub fn new(body: B) -> Self { + let empty = body.is_end_stream(); + + Self { + inner: body, + // These two flags should be true if the inner body is already exhausted. + data_finished: empty, + stream_finished: empty, + } + } +} + +/// A [`LegacyHyperAdaptorBody`] implements the v1.0 [`Body`][body] interface. +/// +/// [body]: crate::v1_0::http_body::Body +impl crate::v1_0::http_body::Body for LegacyHyperAdaptorBody +where + B: crate::legacy::http_body::Body, +{ + type Data = B::Data; + type Error = B::Error; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll< + Option< + Result::Data>, Self::Error>, + >, + > { + use crate::v1_0::http_body::Frame; + + let LegacyHyperAdaptorBodyPinned { + inner, + data_finished, + stream_finished, + } = self.project(); + + // If the stream is finished, return `None`. + if *stream_finished { + return Poll::Ready(None); + } + + // If all of the body's data has been polled, it is time to poll the trailers. + if *data_finished { + let trailers = ready!(inner.poll_trailers(cx)) + .map(|o| o.map(crate::compat::headers).map(Frame::trailers)) + .transpose(); + // Mark this stream as finished once the inner body yields a result. + *stream_finished = true; + return Poll::Ready(trailers); + } + + // If we're here, we should poll the inner body for a chunk of data. + let data = ready!(inner.poll_data(cx)).map(|r| r.map(Frame::data)); + if data.is_none() { + // If `None` was yielded, mark the data as finished. + *data_finished = true; + } else if matches!(data, Some(Err(_))) { + // If an error was yielded, the data is finished *and* the stream is finished. + *data_finished = true; + *stream_finished = true; + } + + Poll::Ready(data) + } + + #[inline] + fn is_end_stream(&self) -> bool { + self.stream_finished + } + + fn size_hint(&self) -> crate::v1_0::http_body::SizeHint { + let mut size_hint = crate::v1_0::http_body::SizeHint::new(); + let legacy = self.inner.size_hint(); + + // Set the lower, upper, and exact values on the emitted size hint. + size_hint.set_lower(legacy.lower()); + if let Some(upper) = legacy.upper() { + size_hint.set_upper(upper); + } + if let Some(exact) = legacy.exact() { + size_hint.set_exact(exact); + } + + size_hint + } +} + +/// A [`LegacyHyperAdaptorBody`] implements the legacy [`Body`][body] interface. +/// +/// [body]: crate::legacy::http_body::Body +impl crate::legacy::http_body::Body for LegacyHyperAdaptorBody +where + B: crate::legacy::http_body::Body, +{ + type Data = B::Data; + type Error = B::Error; + + #[inline] + fn poll_data( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll>> { + self.project().inner.poll_data(cx) + } + + #[inline] + fn poll_trailers( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll, Self::Error>> { + self.project().inner.poll_trailers(cx) + } + + #[inline] + fn is_end_stream(&self) -> bool { + self.inner.is_end_stream() + } + + #[inline] + fn size_hint(&self) -> crate::legacy::http_body::SizeHint { + self.inner.size_hint() + } +}