Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the connection's HTTP version in transport header #1533

Merged
merged 4 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions linkerd/app/core/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl Config {
svc::stack(ConnectTcp::new(self.connect.keepalive))
.push(tls::Client::layer(identity))
.push_connect_timeout(self.connect.timeout)
.push_map_target(|(_version, target)| target)
.push(self::client::layer())
.push_on_service(svc::MapErr::layer(Into::into))
.into_new_service()
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/inbound/src/http/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl<C> Inbound<C> {
.check_service::<Http>()
.push(transport::metrics::Client::layer(rt.metrics.proxy.transport.clone()))
.check_service::<Http>()
.push_map_target(|(_version, target)| target)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the PR makes sense to me but I'm struggling a bit to understand what we are doing here. My guess is that we do not care about the version here because these clients are going to be used with the main application container? I suppose we don't actually want to check the connection header for an Upgrade value and just forward everything as is?

Similarly, in control.rs, I suppose we'd never need to consider the connection header was set by the client app and we can safely upgrade to H2 which is why we don't care about the version and just use the target?

Would this reasoning be close to what's going on?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only in the outbound case can the protocol be upgraded from HTTP/1 to HTTP/2; so in both of these cases:

  1. The target already has the correct HTTP version, and the client is bound to a single HTTP version; and
  2. The inner stack doesn't actually care about the HTTP version (at the moment, at least).

.push(http::client::layer(
config.proxy.connect.h1_settings,
config.proxy.connect.h2_settings,
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ linkerd-meshtls = { path = "../../meshtls", features = ["rustls"] }
linkerd-meshtls-rustls = { path = "../../meshtls/rustls", features = ["test-util"] }
linkerd-tracing = { path = "../../tracing", features = ["ansi"] }
parking_lot = "0.12"
tokio = { version = "1", features = ["time", "macros"] }
tokio = { version = "1", features = ["macros", "sync", "time"] }
tokio-test = "0.4"
2 changes: 1 addition & 1 deletion linkerd/app/outbound/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl<S> Outbound<S> {
{
let http = self
.clone()
.push_tcp_endpoint::<http::Endpoint>()
.push_tcp_endpoint::<http::Connect>()
.push_http_endpoint()
.push_http_server()
.into_inner();
Expand Down
18 changes: 3 additions & 15 deletions linkerd/app/outbound/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ use linkerd_app_core::{
profiles::{self, LogicalAddr},
proxy::{api_resolve::ProtocolHint, tap},
svc::Param,
tls,
transport_header::SessionProtocol,
Addr, Conditional, CANONICAL_DST_HEADER,
tls, Addr, Conditional, CANONICAL_DST_HEADER,
};
use std::{net::SocketAddr, str::FromStr};

Expand All @@ -30,6 +28,8 @@ pub type Logical = crate::logical::Logical<Version>;
pub type Concrete = crate::logical::Concrete<Version>;
pub type Endpoint = crate::endpoint::Endpoint<Version>;

pub type Connect = self::endpoint::Connect<Endpoint>;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct Route {
logical: Logical,
Expand Down Expand Up @@ -150,18 +150,6 @@ impl Param<client::Settings> for Endpoint {
}
}

impl Param<Option<SessionProtocol>> for Endpoint {
fn param(&self) -> Option<SessionProtocol> {
match self.protocol {
Version::H2 => Some(SessionProtocol::Http2),
Version::Http1 => match self.metadata.protocol_hint() {
ProtocolHint::Http2 => Some(SessionProtocol::Http2),
ProtocolHint::Unknown => Some(SessionProtocol::Http1),
},
}
}
}

impl tap::Inspect for Endpoint {
fn src_addr<B>(&self, req: &Request<B>) -> Option<SocketAddr> {
req.extensions().get::<ClientHandle>().map(|c| c.addr)
Expand Down
151 changes: 128 additions & 23 deletions linkerd/app/outbound/src/http/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
use super::{NewRequireIdentity, NewStripProxyError, ProxyConnectionClose};
use crate::Outbound;
use crate::{tcp::opaque_transport, Outbound};
use linkerd_app_core::{
classify, config, errors, http_tracing, metrics,
proxy::{http, tap},
svc::{self, ExtractParam},
tls, Error, Result, CANONICAL_DST_HEADER,
tls,
transport::{self, Remote, ServerAddr},
transport_header::SessionProtocol,
Error, Result, CANONICAL_DST_HEADER,
};

#[derive(Copy, Clone, Debug)]
struct ClientRescue {
emit_headers: bool,
}

#[derive(Clone, Debug)]
pub struct Connect<T> {
version: http::Version,
inner: T,
}

impl<C> Outbound<C> {
pub fn push_http_endpoint<T, B>(self) -> Outbound<svc::ArcNewHttp<T, B>>
where
Expand All @@ -23,7 +32,7 @@ impl<C> Outbound<C> {
+ tap::Inspect,
B: http::HttpBody<Error = Error> + std::fmt::Debug + Default + Send + 'static,
B::Data: Send + 'static,
C: svc::MakeConnection<T> + Clone + Send + Sync + Unpin + 'static,
C: svc::MakeConnection<Connect<T>> + Clone + Send + Sync + Unpin + 'static,
C::Connection: Send + Unpin,
C::Metadata: Send + Unpin,
C::Future: Send + Unpin + 'static,
Expand All @@ -39,7 +48,9 @@ impl<C> Outbound<C> {
// Initiates an HTTP client on the underlying transport. Prior-knowledge HTTP/2
// is typically used (i.e. when communicating with other proxies); though
// HTTP/1.x fallback is supported as needed.
connect
svc::stack(connect.into_inner().into_service())
.check_service::<Connect<T>>()
.push_map_target(|(version, inner)| Connect { version, inner })
.push(http::client::layer(h1_settings, h2_settings))
.push_on_service(svc::MapErr::layer(Into::<Error>::into))
.check_service::<T>()
Expand Down Expand Up @@ -132,17 +143,66 @@ impl errors::HttpRescue<Error> for ClientRescue {
}
}

// === impl Connect ===

impl<T> svc::Param<Option<SessionProtocol>> for Connect<T> {
fn param(&self) -> Option<SessionProtocol> {
match self.version {
http::Version::Http1 => Some(SessionProtocol::Http1),
http::Version::H2 => Some(SessionProtocol::Http2),
}
}
}

impl<T: svc::Param<Remote<ServerAddr>>> svc::Param<Remote<ServerAddr>> for Connect<T> {
fn param(&self) -> Remote<ServerAddr> {
self.inner.param()
}
}

impl<T: svc::Param<tls::ConditionalClientTls>> svc::Param<tls::ConditionalClientTls>
for Connect<T>
{
fn param(&self) -> tls::ConditionalClientTls {
self.inner.param()
}
}

impl<T: svc::Param<Option<opaque_transport::PortOverride>>>
svc::Param<Option<opaque_transport::PortOverride>> for Connect<T>
{
fn param(&self) -> Option<opaque_transport::PortOverride> {
self.inner.param()
}
}

impl<T: svc::Param<Option<http::AuthorityOverride>>> svc::Param<Option<http::AuthorityOverride>>
for Connect<T>
{
fn param(&self) -> Option<http::AuthorityOverride> {
self.inner.param()
}
}

impl<T: svc::Param<transport::labels::Key>> svc::Param<transport::labels::Key> for Connect<T> {
fn param(&self) -> transport::labels::Key {
self.inner.param()
}
olix0r marked this conversation as resolved.
Show resolved Hide resolved
}

#[cfg(test)]
mod test {
use super::*;
use crate::{http, test_util::*, transport::addrs::*};
use crate::{http, test_util::*};
use ::http::header::{CONNECTION, UPGRADE};
use linkerd_app_core::{
io,
proxy::api_resolve::Metadata,
svc::{NewService, ServiceExt},
Infallible,
};
use std::net::SocketAddr;
use support::resolver::ProtocolHint;

static WAS_ORIG_PROTO: &str = "request-orig-proto";

Expand All @@ -154,7 +214,7 @@ mod test {
let addr = SocketAddr::new([192, 0, 2, 41].into(), 2041);

let connect = support::connect()
.endpoint_fn_boxed(addr, |_: http::Endpoint| serve(::http::Version::HTTP_11));
.endpoint_fn_boxed(addr, |_: http::Connect| serve(::http::Version::HTTP_11));

// Build the outbound server
let (rt, _shutdown) = runtime();
Expand Down Expand Up @@ -191,7 +251,7 @@ mod test {
let addr = SocketAddr::new([192, 0, 2, 41].into(), 2042);

let connect = support::connect()
.endpoint_fn_boxed(addr, |_: http::Endpoint| serve(::http::Version::HTTP_2));
.endpoint_fn_boxed(addr, |_: http::Connect| serve(::http::Version::HTTP_2));

// Build the outbound server
let (rt, _shutdown) = runtime();
Expand Down Expand Up @@ -230,7 +290,7 @@ mod test {

// Pretend the upstream is a proxy that supports proto upgrades...
let connect = support::connect()
.endpoint_fn_boxed(addr, |_: http::Endpoint| serve(::http::Version::HTTP_2));
.endpoint_fn_boxed(addr, |_: http::Connect| serve(::http::Version::HTTP_2));

// Build the outbound server
let (rt, _shutdown) = runtime();
Expand All @@ -245,13 +305,7 @@ mod test {
logical_addr: None,
opaque_protocol: false,
tls: tls::ConditionalClientTls::None(tls::NoClientTls::Disabled),
metadata: Metadata::new(
None,
support::resolver::ProtocolHint::Http2,
None,
None,
None,
),
metadata: Metadata::new(None, ProtocolHint::Http2, None, None, None),
});

let req = http::Request::builder()
Expand All @@ -270,6 +324,63 @@ mod test {
);
}

#[tokio::test(flavor = "current_thread")]
async fn orig_proto_skipped_on_http_upgrade() {
let _trace = linkerd_tracing::test::trace_init();

let addr = SocketAddr::new([192, 0, 2, 41].into(), 2041);

// Pretend the upstream is a proxy that supports proto upgrades. The service needs to
// support both HTTP/1 and HTTP/2 because an HTTP/2 connection is maintained by default and
// HTTP/1 connections are created as-needed.
hawkw marked this conversation as resolved.
Show resolved Hide resolved
let connect = support::connect().endpoint_fn_boxed(addr, |c: http::Connect| {
serve(match svc::Param::param(&c) {
Some(SessionProtocol::Http1) => ::http::Version::HTTP_11,
Some(SessionProtocol::Http2) => ::http::Version::HTTP_2,
None => unreachable!(),
})
});

// Build the outbound server
let (rt, _shutdown) = runtime();
let drain = rt.drain.clone();
let stack = Outbound::new(default_config(), rt)
.with_stack(connect)
.push_http_endpoint::<_, http::BoxBody>()
.into_stack()
.push_on_service(http::BoxRequest::layer())
// We need the server-side upgrade layer to annotate the request so that the client
// knows that an HTTP upgrade is in progress.
.push_on_service(svc::layer::mk(|svc| {
http::upgrade::Service::new(svc, drain.clone())
}))
.into_inner();

let svc = stack.new_service(http::Endpoint {
addr: Remote(ServerAddr(addr)),
protocol: http::Version::Http1,
logical_addr: None,
opaque_protocol: false,
tls: tls::ConditionalClientTls::None(tls::NoClientTls::Disabled),
metadata: Metadata::new(None, ProtocolHint::Http2, None, None, None),
});

let req = http::Request::builder()
.version(::http::Version::HTTP_11)
.uri("http://foo.example.com")
.extension(http::ClientHandle::new(([192, 0, 2, 101], 40200).into()).0)
// The request has upgrade headers
.header(CONNECTION, "upgrade")
.header(UPGRADE, "linkerdrocks")
.body(hyper::Body::default())
.unwrap();
let rsp = svc.oneshot(req).await.unwrap();
assert_eq!(rsp.status(), http::StatusCode::NO_CONTENT);
// The request did NOT get a linkerd upgrade header.
assert!(rsp.headers().get(WAS_ORIG_PROTO).is_none());
assert_eq!(rsp.version(), ::http::Version::HTTP_11);
}

/// Tests that the the HTTP endpoint stack ignores protocol upgrade hinting for HTTP/2 traffic.
#[tokio::test(flavor = "current_thread")]
async fn orig_proto_http2_noop() {
Expand All @@ -279,7 +390,7 @@ mod test {

// Pretend the upstream is a proxy that supports proto upgrades...
let connect = support::connect()
.endpoint_fn_boxed(addr, |_: http::Endpoint| serve(::http::Version::HTTP_2));
.endpoint_fn_boxed(addr, |_: http::Connect| serve(::http::Version::HTTP_2));

// Build the outbound server
let (rt, _shutdown) = runtime();
Expand All @@ -294,13 +405,7 @@ mod test {
logical_addr: None,
opaque_protocol: false,
tls: tls::ConditionalClientTls::None(tls::NoClientTls::Disabled),
metadata: Metadata::new(
None,
support::resolver::ProtocolHint::Http2,
None,
None,
None,
),
metadata: Metadata::new(None, ProtocolHint::Http2, None, None, None),
});

let req = http::Request::builder()
Expand Down
4 changes: 2 additions & 2 deletions linkerd/proxy/http/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl<C, T, B> tower::Service<T> for MakeClient<C, B>
where
T: Clone + Send + Sync + 'static,
T: Param<Settings>,
C: MakeConnection<T> + Clone + Unpin + Send + Sync + 'static,
C: MakeConnection<(crate::Version, T)> + Clone + Unpin + Send + Sync + 'static,
C::Connection: Unpin + Send,
C::Metadata: Send,
C::Future: Unpin + Send + 'static,
Expand Down Expand Up @@ -133,7 +133,7 @@ type RspFuture = Pin<Box<dyn Future<Output = Result<http::Response<BoxBody>>> +
impl<C, T, B> Service<http::Request<B>> for Client<C, T, B>
where
T: Clone + Send + Sync + 'static,
C: MakeConnection<T> + Clone + Send + Sync + 'static,
C: MakeConnection<(crate::Version, T)> + Clone + Send + Sync + 'static,
C::Connection: Unpin + Send,
C::Future: Unpin + Send + 'static,
C::Error: Into<Error>,
Expand Down
8 changes: 5 additions & 3 deletions linkerd/proxy/http/src/glue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,15 @@ impl<C, T> HyperConnect<C, T> {
pub(super) fn new(connect: C, target: T, absolute_form: bool) -> Self {
HyperConnect {
connect,
absolute_form,
target,
absolute_form,
}
}
}

impl<C, T> Service<hyper::Uri> for HyperConnect<C, T>
where
C: MakeConnection<T> + Clone + Send + Sync,
C: MakeConnection<(crate::Version, T)> + Clone + Send + Sync,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this kind of makes me wonder if the Connect<T> type should just live in linkerd-proxy-http, but i guess then we wouldn't be able to write all the necessary Param impls for Connect<T>...it's a shame we can't just have a

impl<T, P> Param<P> for Connect<T> where T: Param<P> { ... }

without conflicting with the existing Param blanket impls :(

Copy link
Member Author

@olix0r olix0r Mar 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, agreed that it would be a lot nicer with blanket impls. in general, though, I prefer to keep the target types in the modules from which they are emitted.

C::Connection: Unpin + Send,
C::Future: Unpin + Send + 'static,
T: Clone + Send + Sync,
Expand All @@ -186,7 +186,9 @@ where

fn call(&mut self, _dst: hyper::Uri) -> Self::Future {
HyperConnectFuture {
inner: self.connect.connect(self.target.clone()),
inner: self
.connect
.connect((crate::Version::Http1, self.target.clone())),
absolute_form: self.absolute_form,
}
}
Expand Down
2 changes: 1 addition & 1 deletion linkerd/proxy/http/src/h1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type RspFuture = Pin<Box<dyn Future<Output = Result<http::Response<BoxBody>>> +
impl<C, T, B> Client<C, T, B>
where
T: Clone + Send + Sync + 'static,
C: MakeConnection<T> + Clone + Send + Sync + 'static,
C: MakeConnection<(crate::Version, T)> + Clone + Send + Sync + 'static,
C::Connection: Unpin + Send,
C::Future: Unpin + Send + 'static,
B: hyper::body::HttpBody + Send + 'static,
Expand Down
4 changes: 2 additions & 2 deletions linkerd/proxy/http/src/h2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type ConnectFuture<B> = Pin<Box<dyn Future<Output = Result<Connection<B>>> + Sen

impl<C, B, T> Service<T> for Connect<C, B>
where
C: MakeConnection<T>,
C: MakeConnection<(crate::Version, T)>,
C::Connection: Send + Unpin + 'static,
C::Metadata: Send,
C::Future: Send + 'static,
Expand All @@ -88,7 +88,7 @@ where

let connect = self
.connect
.connect(target)
.connect((crate::Version::H2, target))
.instrument(trace_span!("connect"));

Box::pin(
Expand Down
Loading