-
Notifications
You must be signed in to change notification settings - Fork 271
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
inbound: Consolidate port-based switching #1170
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,15 +10,13 @@ | |
mod allow_discovery; | ||
pub mod direct; | ||
pub mod http; | ||
mod prevent_loop; | ||
mod require_identity; | ||
pub mod target; | ||
#[cfg(any(test, fuzzing))] | ||
pub(crate) mod test_util; | ||
|
||
pub use self::target::{HttpEndpoint, Logical, RequestTarget, Target, TcpEndpoint}; | ||
use self::{ | ||
prevent_loop::PreventLoop, | ||
require_identity::RequireIdentityForPorts, | ||
target::{HttpAccept, TcpAccept}, | ||
}; | ||
|
@@ -107,6 +105,7 @@ impl Inbound<()> { | |
// forwarding and HTTP proxying). | ||
pub fn into_tcp_connect<T>( | ||
self, | ||
proxy_port: u16, | ||
) -> Inbound< | ||
impl svc::Service< | ||
T, | ||
|
@@ -127,11 +126,21 @@ impl Inbound<()> { | |
.. | ||
} = config.proxy.connect; | ||
|
||
#[derive(Debug, thiserror::Error)] | ||
#[error("inbound connection must not target port {0}")] | ||
struct Loop(u16); | ||
|
||
svc::stack(transport::ConnectTcp::new(*keepalive)) | ||
.push_map_target(|t: T| Remote(ServerAddr(([127, 0, 0, 1], t.param()).into()))) | ||
// Limits the time we wait for a connection to be established. | ||
.push_connect_timeout(*timeout) | ||
.push(svc::stack::BoxFuture::layer()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This |
||
// Prevent connections that would target the inbound proxy port from looping. | ||
.push_request_filter(move |t: T| { | ||
let port = t.param(); | ||
if port == proxy_port { | ||
return Err(Loop(port)); | ||
} | ||
Ok(Remote(ServerAddr(([127, 0, 0, 1], port).into()))) | ||
}) | ||
}) | ||
} | ||
|
||
|
@@ -162,7 +171,7 @@ impl Inbound<()> { | |
let serve = async move { | ||
let shutdown = self.runtime.drain.clone().signaled(); | ||
let stack = self | ||
.into_tcp_connect() | ||
.into_tcp_connect(la.port()) | ||
.push_server(la.port(), profiles, gateway) | ||
.into_inner(); | ||
serve::serve(listen, stack, shutdown).await | ||
|
@@ -181,7 +190,6 @@ where | |
{ | ||
pub fn push_tcp_forward<I>( | ||
self, | ||
server_port: u16, | ||
) -> Inbound< | ||
svc::BoxNewService< | ||
TcpEndpoint, | ||
|
@@ -193,13 +201,10 @@ where | |
I: Debug + Send + Sync + Unpin + 'static, | ||
{ | ||
self.map_stack(|_, rt, connect| { | ||
let prevent_loop = PreventLoop::from(server_port); | ||
|
||
// Forwards TCP streams that cannot be decoded as HTTP. | ||
// | ||
// Looping is always prevented. | ||
connect | ||
.push_request_filter(prevent_loop) | ||
.push(rt.metrics.transport.layer_connect()) | ||
.push_make_thunk() | ||
.push_on_response( | ||
|
@@ -215,7 +220,7 @@ where | |
|
||
pub fn push_server<T, I, G, GSvc, P>( | ||
self, | ||
server_port: u16, | ||
proxy_port: u16, | ||
profiles: P, | ||
gateway: G, | ||
) -> Inbound<svc::BoxNewService<T, svc::BoxService<I, (), Error>>> | ||
|
@@ -236,7 +241,7 @@ where | |
// Handles inbound connections that target an opaque port. | ||
let opaque = self | ||
.clone() | ||
.push_tcp_forward(server_port) | ||
.push_tcp_forward() | ||
.map_stack(|_, rt, tcp| { | ||
tcp.push_map_target(TcpEndpoint::from) | ||
.push(rt.metrics.transport.layer_accept()) | ||
|
@@ -245,7 +250,7 @@ where | |
.into_stack(); | ||
|
||
// Handles inbound connections that could not be detected as HTTP. | ||
let tcp = self.clone().push_tcp_forward(server_port); | ||
let tcp = self.clone().push_tcp_forward(); | ||
|
||
// Handles connections targeting the inbound proxy port--either by acting as a gateway to | ||
// the outbound stack or by forwarding connections locally (for opauque transport). | ||
|
@@ -285,31 +290,37 @@ where | |
identity: rt.identity.clone(), | ||
})) | ||
}) | ||
.map_stack(|cfg, _, detect| { | ||
.map_stack(|cfg, rt, detect| { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One more question for this stack: do we need to consider this closure arg? I don't see it being used, why not leave it as ignored? I think it makes more sense to not ignore it, as a newbie it helps me visualise it better so I'm just curious if it's for visibility. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's used below the diff (I combined two |
||
let disable_detect = cfg.disable_protocol_detection_for_ports.clone(); | ||
detect | ||
.instrument(|_: &_| debug_span!("proxy")) | ||
.push_switch( | ||
// If the connection targets a port on which protocol detection is disabled, | ||
// then we forward it directly to the application, bypassing protocol | ||
// detection. | ||
move |t: T| -> Result<_, Infallible> { | ||
let OrigDstAddr(addr) = t.param(); | ||
if !disable_detect.contains(&addr.port()) { | ||
Ok(svc::Either::A(t)) | ||
} else { | ||
Ok(svc::Either::B(TcpAccept::port_skipped(t))) | ||
if disable_detect.contains(&addr.port()) { | ||
return Ok(svc::Either::B(TcpAccept::port_skipped(t))); | ||
} | ||
Ok(svc::Either::A(t)) | ||
}, | ||
opaque | ||
.instrument(|_: &TcpAccept| debug_span!("forward")) | ||
.into_inner(), | ||
) | ||
.check_new_service::<T, I>() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm curious why these have been removed. If I understand correctly, First, I'm not sure if my understanding here is fully correct, I see that in the function implementation we say "stack serves T typed targets", which is another way of saying the target we have and the request type are fit for this stack? Second, are these checks unnecessary because we have replaced the outer stack with a switch? (I guess checks is the wrong way to put it since the first two layers from the bottom are just allocating it all on the heap?) Sorry for the long comment! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These checks are really just compile-time assertions. They can be helpful for debugging when a stack doesn't compile, but there's no inherent benefit to having them aside from clarity. We could probably restore this without any issue, though.... |
||
.push_on_response(svc::BoxService::layer()) | ||
.push(svc::BoxNewService::layer()) | ||
}) | ||
.map_stack(|_, rt, accept| { | ||
accept | ||
.push_switch( | ||
PreventLoop::from(server_port).to_switch(), | ||
// If the connection targets the inbound proxy port, the connection is most | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the comment here! Before reviewing, I went through the inbound stack to see if I can figure out what's going on. The comment was really useful to test my understanding. |
||
// likely using opaque transport to target an alternate port, or possibly an | ||
// outbound target if the proxy is configured as a gateway. The direct stack | ||
// handles these connections. | ||
move |t: T| -> Result<_, Infallible> { | ||
let OrigDstAddr(a) = t.param(); | ||
if a.port() == proxy_port { | ||
return Ok(svc::Either::B(t)); | ||
} | ||
Ok(svc::Either::A(t)) | ||
}, | ||
direct.into_inner(), | ||
) | ||
.instrument(|a: &T| { | ||
|
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These constraints are unnecessary and caused us to have to document
T
. Easier to just remove the constraints.