From b814f9b88779a23498ec9ac4e9f6bb8ce77b982b Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Wed, 28 Jul 2021 07:01:01 +0000 Subject: [PATCH 1/2] inbound: Consolidate port-based switching The `inbound::prevent_loop` module implements predicates for switching based on the target port. But there's no reason for this control flow to be decoupled from our stack consruction. In preparation for further changes to inbound-port-based policy, this change eliminates the `prevent_loop` module. The tcp connection stack is updated to handle loop detection (instead of the TCP forward stack) so that we are totally unable to initiate looping connections (i.e. if some higher part of the stack were to do something unexpected). --- linkerd/app/core/src/svc.rs | 8 +--- linkerd/app/inbound/src/lib.rs | 59 ++++++++++++++---------- linkerd/app/inbound/src/prevent_loop.rs | 61 ------------------------- 3 files changed, 37 insertions(+), 91 deletions(-) delete mode 100644 linkerd/app/inbound/src/prevent_loop.rs diff --git a/linkerd/app/core/src/svc.rs b/linkerd/app/core/src/svc.rs index 9766a86760..8a6c97e8eb 100644 --- a/linkerd/app/core/src/svc.rs +++ b/linkerd/app/core/src/svc.rs @@ -178,14 +178,10 @@ impl Stack { /// `ConnectTimeout` error. /// /// Note that any timeouts errors from the inner service will be wrapped as well. - pub fn push_connect_timeout( + pub fn push_connect_timeout( self, timeout: Duration, - ) -> Stack, impl FnOnce(Error) -> Error + Clone>> - where - S: Service, - S::Error: Into, - { + ) -> Stack, impl FnOnce(Error) -> Error + Clone>> { self.push_timeout(timeout) .push(MapErrLayer::new(move |err: Error| { if err.is::() { diff --git a/linkerd/app/inbound/src/lib.rs b/linkerd/app/inbound/src/lib.rs index 8778775490..570af0d224 100644 --- a/linkerd/app/inbound/src/lib.rs +++ b/linkerd/app/inbound/src/lib.rs @@ -10,7 +10,6 @@ mod allow_discovery; pub mod direct; pub mod http; -mod prevent_loop; mod require_identity; pub mod target; #[cfg(any(test, fuzzing))] @@ -18,7 +17,6 @@ pub(crate) mod test_util; pub use self::target::{HttpEndpoint, Logical, RequestTarget, Target, TcpEndpoint}; use self::{ - prevent_loop::PreventLoop, require_identity::RequireIdentityForPorts, target::{HttpAccept, TcpAccept}, }; @@ -99,6 +97,7 @@ impl Inbound<()> { // forwarding and HTTP proxying). pub fn into_tcp_connect( self, + proxy_port: u16, ) -> Inbound< impl svc::Service< T, @@ -119,11 +118,21 @@ impl Inbound<()> { .. } = config.proxy.connect; + #[derive(Debug, thiserror::Error)] + #[error("inbound connection must not target port {0}")] + struct Loop(u16); + svc::stack(transport::ConnectTcp::new(*keepalive)) - .push_map_target(|t: T| Remote(ServerAddr(([127, 0, 0, 1], t.param()).into()))) // Limits the time we wait for a connection to be established. .push_connect_timeout(*timeout) - .push(svc::stack::BoxFuture::layer()) + // Prevent connections that would target the inbound proxy port from looping. + .push_request_filter(move |t: T| { + let port = t.param(); + if port == proxy_port { + return Err(Loop(port)); + } + Ok(Remote(ServerAddr(([127, 0, 0, 1], port).into()))) + }) }) } @@ -154,7 +163,7 @@ impl Inbound<()> { let serve = async move { let shutdown = self.runtime.drain.clone().signaled(); let stack = self - .into_tcp_connect() + .into_tcp_connect(la.port()) .push_server(la.port(), profiles, gateway) .into_inner(); serve::serve(listen, stack, shutdown).await @@ -173,7 +182,6 @@ where { pub fn push_tcp_forward( self, - server_port: u16, ) -> Inbound< svc::BoxNewService< TcpEndpoint, @@ -185,13 +193,10 @@ where I: Debug + Send + Sync + Unpin + 'static, { self.map_stack(|_, rt, connect| { - let prevent_loop = PreventLoop::from(server_port); - // Forwards TCP streams that cannot be decoded as HTTP. // // Looping is always prevented. connect - .push_request_filter(prevent_loop) .push(rt.metrics.transport.layer_connect()) .push_make_thunk() .push_on_response( @@ -207,7 +212,7 @@ where pub fn push_server( self, - server_port: u16, + proxy_port: u16, profiles: P, gateway: G, ) -> Inbound>> @@ -228,7 +233,7 @@ where // Handles inbound connections that target an opaque port. let opaque = self .clone() - .push_tcp_forward(server_port) + .push_tcp_forward() .map_stack(|_, rt, tcp| { tcp.push_map_target(TcpEndpoint::from) .push(rt.metrics.transport.layer_accept()) @@ -237,7 +242,7 @@ where .into_stack(); // Handles inbound connections that could not be detected as HTTP. - let tcp = self.clone().push_tcp_forward(server_port); + let tcp = self.clone().push_tcp_forward(); // Handles connections targeting the inbound proxy port--either by acting as a gateway to // the outbound stack or by forwarding connections locally (for opauque transport). @@ -277,31 +282,37 @@ where detect_timeout, )) }) - .map_stack(|cfg, _, detect| { + .map_stack(|cfg, rt, detect| { let disable_detect = cfg.disable_protocol_detection_for_ports.clone(); detect .instrument(|_: &_| debug_span!("proxy")) .push_switch( + // If the connection targets a port on which protocol detection is disabled, + // then we forward it directly to the application, bypassing protocol + // detection. move |t: T| -> Result<_, Infallible> { let OrigDstAddr(addr) = t.param(); - if !disable_detect.contains(&addr.port()) { - Ok(svc::Either::A(t)) - } else { - Ok(svc::Either::B(TcpAccept::port_skipped(t))) + if disable_detect.contains(&addr.port()) { + return Ok(svc::Either::B(TcpAccept::port_skipped(t))); } + Ok(svc::Either::A(t)) }, opaque .instrument(|_: &TcpAccept| debug_span!("forward")) .into_inner(), ) - .check_new_service::() - .push_on_response(svc::BoxService::layer()) - .push(svc::BoxNewService::layer()) - }) - .map_stack(|_, rt, accept| { - accept .push_switch( - PreventLoop::from(server_port).to_switch(), + // If the connection targets the inbound proxy port, the connection is most + // likely using opaque transport to target an alternate port, or possible an + // outbound target if the proxy is configured as a gateway. The direct stack + // handles these connections. + move |t: T| -> Result<_, Infallible> { + let OrigDstAddr(a) = t.param(); + if a.port() == proxy_port { + return Ok(svc::Either::B(t)); + } + Ok(svc::Either::A(t)) + }, direct.into_inner(), ) .instrument(|a: &T| { diff --git a/linkerd/app/inbound/src/prevent_loop.rs b/linkerd/app/inbound/src/prevent_loop.rs deleted file mode 100644 index 408bd2de43..0000000000 --- a/linkerd/app/inbound/src/prevent_loop.rs +++ /dev/null @@ -1,61 +0,0 @@ -use crate::TcpEndpoint; -use linkerd_app_core::{ - svc::stack::{Either, Param, Predicate}, - transport::addrs::OrigDstAddr, - Error, -}; -use thiserror::Error; - -/// A connection policy that drops -#[derive(Copy, Clone, Debug)] -pub struct PreventLoop { - port: u16, -} -#[derive(Copy, Clone, Debug)] -pub struct SwitchLoop { - port: u16, -} - -#[derive(Copy, Clone, Debug, Error)] -#[error("inbound connection must not target port {}", self.port)] -pub struct LoopPrevented { - port: u16, -} - -impl From for PreventLoop { - fn from(port: u16) -> Self { - Self { port } - } -} - -impl Predicate for PreventLoop { - type Request = TcpEndpoint; - - fn check(&mut self, t: TcpEndpoint) -> Result { - if t.port == self.port { - Err(LoopPrevented { port: t.port }.into()) - } else { - Ok(t) - } - } -} - -impl PreventLoop { - pub fn to_switch(self) -> SwitchLoop { - SwitchLoop { port: self.port } - } -} - -impl> Predicate for SwitchLoop { - type Request = Either; - - fn check(&mut self, addrs: T) -> Result, Error> { - let OrigDstAddr(addr) = addrs.param(); - tracing::debug!(%addr, self.port); - if addr.port() != self.port { - Ok(Either::A(addrs)) - } else { - Ok(Either::B(addrs)) - } - } -} From 241ab83d4076536790e0493faf077ceb32b71c8e Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 29 Jul 2021 10:20:35 -0700 Subject: [PATCH 2/2] Update linkerd/app/inbound/src/lib.rs --- linkerd/app/inbound/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkerd/app/inbound/src/lib.rs b/linkerd/app/inbound/src/lib.rs index bce1fbcd6b..348f00571c 100644 --- a/linkerd/app/inbound/src/lib.rs +++ b/linkerd/app/inbound/src/lib.rs @@ -311,7 +311,7 @@ where ) .push_switch( // If the connection targets the inbound proxy port, the connection is most - // likely using opaque transport to target an alternate port, or possible an + // likely using opaque transport to target an alternate port, or possibly an // outbound target if the proxy is configured as a gateway. The direct stack // handles these connections. move |t: T| -> Result<_, Infallible> {