Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

inbound: Consolidate port-based switching #1170

Merged
merged 3 commits into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,10 @@ impl<S> Stack<S> {
/// `ConnectTimeout` error.
///
/// Note that any timeouts errors from the inner service will be wrapped as well.
pub fn push_connect_timeout<T>(
pub fn push_connect_timeout(
self,
timeout: Duration,
) -> Stack<stack::MapErr<tower::timeout::Timeout<S>, impl FnOnce(Error) -> Error + Clone>>
where
S: Service<T>,
S::Error: Into<Error>,
{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These constraints are unnecessary and caused us to have to document T. Easier to just remove the constraints.

) -> Stack<stack::MapErr<tower::timeout::Timeout<S>, impl FnOnce(Error) -> Error + Clone>> {
self.push_timeout(timeout)
.push(MapErrLayer::new(move |err: Error| {
if err.is::<tower::timeout::error::Elapsed>() {
Expand Down
59 changes: 35 additions & 24 deletions linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@
mod allow_discovery;
pub mod direct;
pub mod http;
mod prevent_loop;
mod require_identity;
pub mod target;
#[cfg(any(test, fuzzing))]
pub(crate) mod test_util;

pub use self::target::{HttpEndpoint, Logical, RequestTarget, Target, TcpEndpoint};
use self::{
prevent_loop::PreventLoop,
require_identity::RequireIdentityForPorts,
target::{HttpAccept, TcpAccept},
};
Expand Down Expand Up @@ -107,6 +105,7 @@ impl Inbound<()> {
// forwarding and HTTP proxying).
pub fn into_tcp_connect<T>(
self,
proxy_port: u16,
) -> Inbound<
impl svc::Service<
T,
Expand All @@ -127,11 +126,21 @@ impl Inbound<()> {
..
} = config.proxy.connect;

#[derive(Debug, thiserror::Error)]
#[error("inbound connection must not target port {0}")]
struct Loop(u16);

svc::stack(transport::ConnectTcp::new(*keepalive))
.push_map_target(|t: T| Remote(ServerAddr(([127, 0, 0, 1], t.param()).into())))
// Limits the time we wait for a connection to be established.
.push_connect_timeout(*timeout)
.push(svc::stack::BoxFuture::layer())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This BoxFuture layer is redundant -- there's already a BoxFuture in the HTTP router where it's actually needed.

// Prevent connections that would target the inbound proxy port from looping.
.push_request_filter(move |t: T| {
let port = t.param();
if port == proxy_port {
return Err(Loop(port));
}
Ok(Remote(ServerAddr(([127, 0, 0, 1], port).into())))
})
})
}

Expand Down Expand Up @@ -162,7 +171,7 @@ impl Inbound<()> {
let serve = async move {
let shutdown = self.runtime.drain.clone().signaled();
let stack = self
.into_tcp_connect()
.into_tcp_connect(la.port())
.push_server(la.port(), profiles, gateway)
.into_inner();
serve::serve(listen, stack, shutdown).await
Expand All @@ -181,7 +190,6 @@ where
{
pub fn push_tcp_forward<I>(
self,
server_port: u16,
) -> Inbound<
svc::BoxNewService<
TcpEndpoint,
Expand All @@ -193,13 +201,10 @@ where
I: Debug + Send + Sync + Unpin + 'static,
{
self.map_stack(|_, rt, connect| {
let prevent_loop = PreventLoop::from(server_port);

// Forwards TCP streams that cannot be decoded as HTTP.
//
// Looping is always prevented.
connect
.push_request_filter(prevent_loop)
.push(rt.metrics.transport.layer_connect())
.push_make_thunk()
.push_on_response(
Expand All @@ -215,7 +220,7 @@ where

pub fn push_server<T, I, G, GSvc, P>(
self,
server_port: u16,
proxy_port: u16,
profiles: P,
gateway: G,
) -> Inbound<svc::BoxNewService<T, svc::BoxService<I, (), Error>>>
Expand All @@ -236,7 +241,7 @@ where
// Handles inbound connections that target an opaque port.
let opaque = self
.clone()
.push_tcp_forward(server_port)
.push_tcp_forward()
.map_stack(|_, rt, tcp| {
tcp.push_map_target(TcpEndpoint::from)
.push(rt.metrics.transport.layer_accept())
Expand All @@ -245,7 +250,7 @@ where
.into_stack();

// Handles inbound connections that could not be detected as HTTP.
let tcp = self.clone().push_tcp_forward(server_port);
let tcp = self.clone().push_tcp_forward();

// Handles connections targeting the inbound proxy port--either by acting as a gateway to
// the outbound stack or by forwarding connections locally (for opauque transport).
Expand Down Expand Up @@ -285,31 +290,37 @@ where
identity: rt.identity.clone(),
}))
})
.map_stack(|cfg, _, detect| {
.map_stack(|cfg, rt, detect| {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more question for this stack: do we need to consider this closure arg? I don't see it being used, why not leave it as ignored? I think it makes more sense to not ignore it, as a newbie it helps me visualise it better so I'm just curious if it's for visibility.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used below the diff (I combined two map_stacks into one). Our compilation settings (#[deny(warning)]) won't actually let us compile with unused variables :)

let disable_detect = cfg.disable_protocol_detection_for_ports.clone();
detect
.instrument(|_: &_| debug_span!("proxy"))
.push_switch(
// If the connection targets a port on which protocol detection is disabled,
// then we forward it directly to the application, bypassing protocol
// detection.
move |t: T| -> Result<_, Infallible> {
let OrigDstAddr(addr) = t.param();
if !disable_detect.contains(&addr.port()) {
Ok(svc::Either::A(t))
} else {
Ok(svc::Either::B(TcpAccept::port_skipped(t)))
if disable_detect.contains(&addr.port()) {
return Ok(svc::Either::B(TcpAccept::port_skipped(t)));
}
Ok(svc::Either::A(t))
},
opaque
.instrument(|_: &TcpAccept| debug_span!("forward"))
.into_inner(),
)
.check_new_service::<T, I>()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why these have been removed. If I understand correctly, check_new_service::<T, I> would check if our target T implements the same traits as a type I so we can check whether our target provides Peek, AsyncRead (and write).

First, I'm not sure if my understanding here is fully correct, I see that in the function implementation we say "stack serves T typed targets", which is another way of saying the target we have and the request type are fit for this stack?

Second, are these checks unnecessary because we have replaced the outer stack with a switch? (I guess checks is the wrong way to put it since the first two layers from the bottom are just allocating it all on the heap?)

Sorry for the long comment!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These checks are really just compile-time assertions. They can be helpful for debugging when a stack doesn't compile, but there's no inherent benefit to having them aside from clarity. We could probably restore this without any issue, though....

.push_on_response(svc::BoxService::layer())
.push(svc::BoxNewService::layer())
})
.map_stack(|_, rt, accept| {
accept
.push_switch(
PreventLoop::from(server_port).to_switch(),
// If the connection targets the inbound proxy port, the connection is most
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment here! Before reviewing, I went through the inbound stack to see if I can figure out what's going on. The comment was really useful to test my understanding.

// likely using opaque transport to target an alternate port, or possible an
olix0r marked this conversation as resolved.
Show resolved Hide resolved
// outbound target if the proxy is configured as a gateway. The direct stack
// handles these connections.
move |t: T| -> Result<_, Infallible> {
let OrigDstAddr(a) = t.param();
if a.port() == proxy_port {
return Ok(svc::Either::B(t));
}
Ok(svc::Either::A(t))
},
direct.into_inner(),
)
.instrument(|a: &T| {
Expand Down
61 changes: 0 additions & 61 deletions linkerd/app/inbound/src/prevent_loop.rs

This file was deleted.