Skip to content

Commit

Permalink
Use the connection's HTTP version in transport header (#1533)
Browse files Browse the repository at this point in the history
Currently, when the outbound proxy communicates with a meshed endpoint
that uses opaque transport (i.e. multi-cluster gateways), it *always*
sets the gateway header's session protocol to HTTP/2, since the meshed
endpoint supports HTTP/2 protocol upgrading. But the HTTP client may
choose not to use HTTP/2 if the request includes the `Upgrade` header,
as it does for WebSocket connections. In these cases, the transport
header should indicate that the connection is HTTP/1.

This change modifies the HTTP client to pass the used protocol version
when building a connection. This value is then used to set the session
protocol header when it is required..

Signed-off-by: Oliver Gould <ver@buoyant.io>
(cherry picked from commit cbb3390)
Signed-off-by: Oliver Gould <ver@buoyant.io>
  • Loading branch information
olix0r committed Mar 30, 2022
1 parent c6000b5 commit 8b7dd10
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 49 deletions.
1 change: 1 addition & 0 deletions linkerd/app/core/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl Config {
svc::stack(ConnectTcp::new(self.connect.keepalive))
.push(tls::Client::layer(identity))
.push_connect_timeout(self.connect.timeout)
.push_map_target(|(_version, target)| target)
.push(self::client::layer())
.push_on_service(svc::MapErr::layer(Into::into))
.into_new_service()
Expand Down
2 changes: 2 additions & 0 deletions linkerd/app/inbound/src/http/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ impl<C> Inbound<C> {
let http = connect
.push(svc::stack::BoxFuture::layer())
.push(transport::metrics::Client::layer(rt.metrics.proxy.transport.clone()))
.check_service::<Http>()
.push_map_target(|(_version, target)| target)
.push(http::client::layer(
config.proxy.connect.h1_settings,
config.proxy.connect.h2_settings,
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ linkerd-app-test = { path = "../test" }
linkerd-io = { path = "../../io", features = ["tokio-test"] }
linkerd-tracing = { path = "../../tracing", features = ["ansi"] }
parking_lot = "0.12"
tokio = { version = "1", features = ["time", "macros"] }
tokio = { version = "1", features = ["macros", "sync", "time"] }
tokio-test = "0.4"
2 changes: 1 addition & 1 deletion linkerd/app/outbound/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ impl<S> Outbound<S> {
{
let http = self
.clone()
.push_tcp_endpoint::<http::Endpoint>()
.push_tcp_endpoint::<http::Connect>()
.push_http_endpoint()
.push_http_server()
.into_inner();
Expand Down
18 changes: 3 additions & 15 deletions linkerd/app/outbound/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ use linkerd_app_core::{
profiles::{self, LogicalAddr},
proxy::{api_resolve::ProtocolHint, tap},
svc::Param,
tls,
transport_header::SessionProtocol,
Addr, Conditional, CANONICAL_DST_HEADER,
tls, Addr, Conditional, CANONICAL_DST_HEADER,
};
use std::{net::SocketAddr, str::FromStr};

Expand All @@ -30,6 +28,8 @@ pub type Logical = crate::logical::Logical<Version>;
pub type Concrete = crate::logical::Concrete<Version>;
pub type Endpoint = crate::endpoint::Endpoint<Version>;

pub type Connect = self::endpoint::Connect<Endpoint>;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct Route {
logical: Logical,
Expand Down Expand Up @@ -150,18 +150,6 @@ impl Param<client::Settings> for Endpoint {
}
}

impl Param<Option<SessionProtocol>> for Endpoint {
fn param(&self) -> Option<SessionProtocol> {
match self.protocol {
Version::H2 => Some(SessionProtocol::Http2),
Version::Http1 => match self.metadata.protocol_hint() {
ProtocolHint::Http2 => Some(SessionProtocol::Http2),
ProtocolHint::Unknown => Some(SessionProtocol::Http1),
},
}
}
}

impl tap::Inspect for Endpoint {
fn src_addr<B>(&self, req: &Request<B>) -> Option<SocketAddr> {
req.extensions().get::<ClientHandle>().map(|c| c.addr)
Expand Down
157 changes: 134 additions & 23 deletions linkerd/app/outbound/src/http/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use super::{NewRequireIdentity, NewStripProxyError, ProxyConnectionClose};
use crate::Outbound;
use crate::{tcp::opaque_transport, Outbound};
use linkerd_app_core::{
classify, config, errors, http_tracing, metrics,
proxy::{http, tap},
svc::{self, ExtractParam},
tls, Error, Result, CANONICAL_DST_HEADER,
tls,
transport::{self, Remote, ServerAddr},
transport_header::SessionProtocol,
Error, Result, CANONICAL_DST_HEADER,
};
use tokio::io;

Expand All @@ -13,6 +16,12 @@ struct ClientRescue {
emit_headers: bool,
}

#[derive(Clone, Debug)]
pub struct Connect<T> {
version: http::Version,
inner: T,
}

impl<C> Outbound<C> {
pub fn push_http_endpoint<T, B>(self) -> Outbound<svc::ArcNewHttp<T, B>>
where
Expand All @@ -24,7 +33,7 @@ impl<C> Outbound<C> {
+ tap::Inspect,
B: http::HttpBody<Error = Error> + std::fmt::Debug + Default + Send + 'static,
B::Data: Send + 'static,
C: svc::Service<T> + Clone + Send + Sync + Unpin + 'static,
C: svc::Service<Connect<T>> + Clone + Send + Sync + Unpin + 'static,
C::Response: io::AsyncRead + io::AsyncWrite + Send + Unpin,
C::Error: Into<Error>,
C::Future: Send + Unpin + 'static,
Expand All @@ -40,7 +49,9 @@ impl<C> Outbound<C> {
// Initiates an HTTP client on the underlying transport. Prior-knowledge HTTP/2
// is typically used (i.e. when communicating with other proxies); though
// HTTP/1.x fallback is supported as needed.
connect
svc::stack(connect.into_inner())
.check_service::<Connect<T>>()
.push_map_target(|(version, inner)| Connect { version, inner })
.push(http::client::layer(h1_settings, h2_settings))
.push_on_service(svc::MapErr::layer(Into::<Error>::into))
.check_service::<T>()
Expand Down Expand Up @@ -133,17 +144,72 @@ impl errors::HttpRescue<Error> for ClientRescue {
}
}

// === impl Connect ===

impl<T> svc::Param<Option<SessionProtocol>> for Connect<T> {
#[inline]
fn param(&self) -> Option<SessionProtocol> {
match self.version {
http::Version::Http1 => Some(SessionProtocol::Http1),
http::Version::H2 => Some(SessionProtocol::Http2),
}
}
}

impl<T: svc::Param<Remote<ServerAddr>>> svc::Param<Remote<ServerAddr>> for Connect<T> {
#[inline]
fn param(&self) -> Remote<ServerAddr> {
self.inner.param()
}
}

impl<T: svc::Param<tls::ConditionalClientTls>> svc::Param<tls::ConditionalClientTls>
for Connect<T>
{
#[inline]
fn param(&self) -> tls::ConditionalClientTls {
self.inner.param()
}
}

impl<T: svc::Param<Option<opaque_transport::PortOverride>>>
svc::Param<Option<opaque_transport::PortOverride>> for Connect<T>
{
#[inline]
fn param(&self) -> Option<opaque_transport::PortOverride> {
self.inner.param()
}
}

impl<T: svc::Param<Option<http::AuthorityOverride>>> svc::Param<Option<http::AuthorityOverride>>
for Connect<T>
{
#[inline]
fn param(&self) -> Option<http::AuthorityOverride> {
self.inner.param()
}
}

impl<T: svc::Param<transport::labels::Key>> svc::Param<transport::labels::Key> for Connect<T> {
#[inline]
fn param(&self) -> transport::labels::Key {
self.inner.param()
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::{http, test_util::*, transport::addrs::*};
use crate::{http, test_util::*};
use ::http::header::{CONNECTION, UPGRADE};
use linkerd_app_core::{
io,
proxy::api_resolve::Metadata,
svc::{NewService, ServiceExt},
Infallible,
};
use std::net::SocketAddr;
use support::resolver::ProtocolHint;

static WAS_ORIG_PROTO: &str = "request-orig-proto";

Expand All @@ -155,7 +221,7 @@ mod test {
let addr = SocketAddr::new([192, 0, 2, 41].into(), 2041);

let connect = support::connect()
.endpoint_fn_boxed(addr, |_: http::Endpoint| serve(::http::Version::HTTP_11));
.endpoint_fn_boxed(addr, |_: http::Connect| serve(::http::Version::HTTP_11));

// Build the outbound server
let (rt, _shutdown) = runtime();
Expand Down Expand Up @@ -192,7 +258,7 @@ mod test {
let addr = SocketAddr::new([192, 0, 2, 41].into(), 2042);

let connect = support::connect()
.endpoint_fn_boxed(addr, |_: http::Endpoint| serve(::http::Version::HTTP_2));
.endpoint_fn_boxed(addr, |_: http::Connect| serve(::http::Version::HTTP_2));

// Build the outbound server
let (rt, _shutdown) = runtime();
Expand Down Expand Up @@ -231,7 +297,7 @@ mod test {

// Pretend the upstream is a proxy that supports proto upgrades...
let connect = support::connect()
.endpoint_fn_boxed(addr, |_: http::Endpoint| serve(::http::Version::HTTP_2));
.endpoint_fn_boxed(addr, |_: http::Connect| serve(::http::Version::HTTP_2));

// Build the outbound server
let (rt, _shutdown) = runtime();
Expand All @@ -246,13 +312,7 @@ mod test {
logical_addr: None,
opaque_protocol: false,
tls: tls::ConditionalClientTls::None(tls::NoClientTls::Disabled),
metadata: Metadata::new(
None,
support::resolver::ProtocolHint::Http2,
None,
None,
None,
),
metadata: Metadata::new(None, ProtocolHint::Http2, None, None, None),
});

let req = http::Request::builder()
Expand All @@ -271,6 +331,63 @@ mod test {
);
}

#[tokio::test(flavor = "current_thread")]
async fn orig_proto_skipped_on_http_upgrade() {
let _trace = linkerd_tracing::test::trace_init();

let addr = SocketAddr::new([192, 0, 2, 41].into(), 2041);

// Pretend the upstream is a proxy that supports proto upgrades. The service needs to
// support both HTTP/1 and HTTP/2 because an HTTP/2 connection is maintained by default and
// HTTP/1 connections are created as-needed.
let connect = support::connect().endpoint_fn_boxed(addr, |c: http::Connect| {
serve(match svc::Param::param(&c) {
Some(SessionProtocol::Http1) => ::http::Version::HTTP_11,
Some(SessionProtocol::Http2) => ::http::Version::HTTP_2,
None => unreachable!(),
})
});

// Build the outbound server
let (rt, _shutdown) = runtime();
let drain = rt.drain.clone();
let stack = Outbound::new(default_config(), rt)
.with_stack(connect)
.push_http_endpoint::<_, http::BoxBody>()
.into_stack()
.push_on_service(http::BoxRequest::layer())
// We need the server-side upgrade layer to annotate the request so that the client
// knows that an HTTP upgrade is in progress.
.push_on_service(svc::layer::mk(|svc| {
http::upgrade::Service::new(svc, drain.clone())
}))
.into_inner();

let svc = stack.new_service(http::Endpoint {
addr: Remote(ServerAddr(addr)),
protocol: http::Version::Http1,
logical_addr: None,
opaque_protocol: false,
tls: tls::ConditionalClientTls::None(tls::NoClientTls::Disabled),
metadata: Metadata::new(None, ProtocolHint::Http2, None, None, None),
});

let req = http::Request::builder()
.version(::http::Version::HTTP_11)
.uri("http://foo.example.com")
.extension(http::ClientHandle::new(([192, 0, 2, 101], 40200).into()).0)
// The request has upgrade headers
.header(CONNECTION, "upgrade")
.header(UPGRADE, "linkerdrocks")
.body(hyper::Body::default())
.unwrap();
let rsp = svc.oneshot(req).await.unwrap();
assert_eq!(rsp.status(), http::StatusCode::NO_CONTENT);
// The request did NOT get a linkerd upgrade header.
assert!(rsp.headers().get(WAS_ORIG_PROTO).is_none());
assert_eq!(rsp.version(), ::http::Version::HTTP_11);
}

/// Tests that the the HTTP endpoint stack ignores protocol upgrade hinting for HTTP/2 traffic.
#[tokio::test(flavor = "current_thread")]
async fn orig_proto_http2_noop() {
Expand All @@ -280,7 +397,7 @@ mod test {

// Pretend the upstream is a proxy that supports proto upgrades...
let connect = support::connect()
.endpoint_fn_boxed(addr, |_: http::Endpoint| serve(::http::Version::HTTP_2));
.endpoint_fn_boxed(addr, |_: http::Connect| serve(::http::Version::HTTP_2));

// Build the outbound server
let (rt, _shutdown) = runtime();
Expand All @@ -295,13 +412,7 @@ mod test {
logical_addr: None,
opaque_protocol: false,
tls: tls::ConditionalClientTls::None(tls::NoClientTls::Disabled),
metadata: Metadata::new(
None,
support::resolver::ProtocolHint::Http2,
None,
None,
None,
),
metadata: Metadata::new(None, ProtocolHint::Http2, None, None, None),
});

let req = http::Request::builder()
Expand Down
4 changes: 2 additions & 2 deletions linkerd/proxy/http/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl<C, T, B> tower::Service<T> for MakeClient<C, B>
where
T: Clone + Send + Sync + 'static,
T: Param<Settings>,
C: tower::make::MakeConnection<T> + Clone + Unpin + Send + Sync + 'static,
C: tower::make::MakeConnection<(crate::Version, T)> + Clone + Unpin + Send + Sync + 'static,
C::Future: Unpin + Send + 'static,
C::Error: Into<Error>,
C::Connection: Unpin + Send + 'static,
Expand Down Expand Up @@ -134,7 +134,7 @@ type RspFuture = Pin<Box<dyn Future<Output = Result<http::Response<BoxBody>>> +
impl<C, T, B> tower::Service<http::Request<B>> for Client<C, T, B>
where
T: Clone + Send + Sync + 'static,
C: tower::make::MakeConnection<T> + Clone + Send + Sync + 'static,
C: tower::make::MakeConnection<(crate::Version, T)> + Clone + Send + Sync + 'static,
C::Connection: Unpin + Send + 'static,
C::Future: Unpin + Send + 'static,
C::Error: Into<Error>,
Expand Down
8 changes: 5 additions & 3 deletions linkerd/proxy/http/src/glue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,15 @@ impl<C, T> HyperConnect<C, T> {
pub(super) fn new(connect: C, target: T, absolute_form: bool) -> Self {
HyperConnect {
connect,
absolute_form,
target,
absolute_form,
}
}
}

impl<C, T> tower::Service<hyper::Uri> for HyperConnect<C, T>
where
C: tower::make::MakeConnection<T> + Clone + Send + Sync,
C: tower::make::MakeConnection<(crate::Version, T)> + Clone + Send + Sync,
C::Error: Into<Error>,
C::Future: TryFuture<Ok = C::Connection> + Unpin + Send + 'static,
<C::Future as TryFuture>::Error: Into<Error>,
Expand All @@ -185,7 +185,9 @@ where

fn call(&mut self, _dst: hyper::Uri) -> Self::Future {
HyperConnectFuture {
inner: self.connect.make_connection(self.target.clone()),
inner: self
.connect
.make_connection((crate::Version::Http1, self.target.clone())),
absolute_form: self.absolute_form,
}
}
Expand Down
2 changes: 1 addition & 1 deletion linkerd/proxy/http/src/h1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type RspFuture = Pin<Box<dyn Future<Output = Result<http::Response<BoxBody>>> +
impl<C, T, B> Client<C, T, B>
where
T: Clone + Send + Sync + 'static,
C: tower::make::MakeConnection<T> + Clone + Send + Sync + 'static,
C: tower::make::MakeConnection<(crate::Version, T)> + Clone + Send + Sync + 'static,
C::Connection: Unpin + Send + 'static,
C::Future: Unpin + Send + 'static,
C::Error: Into<Error>,
Expand Down
4 changes: 2 additions & 2 deletions linkerd/proxy/http/src/h2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type ConnectFuture<B> = Pin<Box<dyn Future<Output = Result<Connection<B>>> + Sen

impl<C, B, T> tower::Service<T> for Connect<C, B>
where
C: tower::make::MakeConnection<T>,
C: tower::make::MakeConnection<(crate::Version, T)>,
C::Future: Send + 'static,
C::Connection: AsyncRead + AsyncWrite + Unpin + Send + 'static,
C::Error: Into<Error>,
Expand All @@ -88,7 +88,7 @@ where

let connect = self
.connect
.make_connection(target)
.make_connection((crate::Version::H2, target))
.instrument(trace_span!("connect"));

Box::pin(
Expand Down
Loading

0 comments on commit 8b7dd10

Please sign in to comment.