Skip to content

Commit

Permalink
http: Use ExtractParam to support dynamic detect configuration (#1174)
Browse files Browse the repository at this point in the history
Following #1169, this change modifies the `detect` middleware to use
`ExtractParam`. In doing this, it makes sense to modify the default
implementations of `ExtractParam` so that we can provide a static
configuration to the module that is cloned for all targets without
modification.

This change does not alter any functionality, but sets up to make the
HTTP detection layer configured per-target.
  • Loading branch information
olix0r authored Aug 2, 2021
1 parent fd2c1bb commit 79de26d
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 80 deletions.
5 changes: 1 addition & 4 deletions linkerd/app/admin/src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,7 @@ impl Config {
},
)
.push(svc::BoxNewService::layer())
.push(detect::NewDetectService::layer(
DETECT_TIMEOUT,
http::DetectHttp::default(),
))
.push(detect::NewDetectService::layer(detect::Config::<http::DetectHttp>::from_timeout(DETECT_TIMEOUT)))
.push(metrics.transport.layer_accept())
.push_map_target(|(tls, addrs): (tls::ConditionalServerTls, B::Addrs)| {
// TODO this should use an admin-specific target type.
Expand Down
10 changes: 9 additions & 1 deletion linkerd/app/core/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub use crate::exp_backoff::ExponentialBackoff;
use crate::{
proxy::http::{h1, h2},
proxy::http::{self, h1, h2},
svc::Param,
transport::{Keepalive, ListenAddr},
};
Expand Down Expand Up @@ -50,6 +50,14 @@ pub type PortSet = HashSet<u16, BuildHasherDefault<PortHasher>>;
#[derive(Default)]
pub struct PortHasher(u16);

// === impl ProxyConfig ===

impl ProxyConfig {
pub fn detect_http(&self) -> linkerd_detect::Config<http::DetectHttp> {
linkerd_detect::Config::from_timeout(self.detect_protocol_timeout)
}
}

// === impl ServerConfig ===

impl Param<ListenAddr> for ServerConfig {
Expand Down
4 changes: 1 addition & 3 deletions linkerd/app/gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ where
let ProxyConfig {
buffer_capacity,
cache_max_idle_age,
detect_protocol_timeout,
dispatch_timeout,
..
} = inbound.config().proxy.clone();
Expand Down Expand Up @@ -232,8 +231,7 @@ where
.push(svc::Filter::<ClientInfo, _>::layer(HttpLegacy::try_from))
.push(svc::BoxNewService::layer())
.push(detect::NewDetectService::layer(
detect_protocol_timeout,
http::DetectHttp::default(),
inbound.config().proxy.detect_http(),
));

// When a transported connection is received, use the header's target to
Expand Down
5 changes: 1 addition & 4 deletions linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,7 @@ where
))
.push_map_target(detect::allow_timeout)
.push(svc::BoxNewService::layer())
.push(detect::NewDetectService::layer(
detect_timeout,
http::DetectHttp::default(),
))
.push(detect::NewDetectService::layer(cfg.proxy.detect_http()))
.check_new_service::<TcpAccept, _>()
.push_request_filter(require_id)
.push(rt.metrics.transport.layer_accept())
Expand Down
22 changes: 7 additions & 15 deletions linkerd/app/outbound/src/http/detect.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::{http, Outbound};
use linkerd_app_core::{
config::{ProxyConfig, ServerConfig},
config::ServerConfig,
detect, io,
svc::{self, Param},
Error,
Error, Infallible,
};
use tracing::debug_span;

Expand All @@ -29,11 +29,7 @@ impl<N> Outbound<N> {
U: From<(http::Version, T)> + svc::Param<http::Version> + 'static,
{
self.map_stack(|config, rt, tcp| {
let ProxyConfig {
server: ServerConfig { h2_settings, .. },
detect_protocol_timeout,
..
} = config.proxy;
let ServerConfig { h2_settings, .. } = config.proxy.server;

let skipped = tcp
.clone()
Expand All @@ -58,19 +54,15 @@ impl<N> Outbound<N> {
.check_new_service::<(Option<http::Version>, T), _>()
.push_map_target(detect::allow_timeout)
.push(svc::BoxNewService::layer())
.push(detect::NewDetectService::layer(
detect_protocol_timeout,
http::DetectHttp::default(),
))
.push(detect::NewDetectService::layer(config.proxy.detect_http()))
.push_switch(
// When the target is marked as as opaque, we skip HTTP
// detection and just use the TCP stack directly.
|target: T| -> Result<_, Error> {
|target: T| -> Result<_, Infallible> {
if let Some(Skip) = target.param() {
Ok(svc::Either::B(target))
} else {
Ok(svc::Either::A(target))
return Ok(svc::Either::B(target));
}
Ok(svc::Either::A(target))
},
skipped,
)
Expand Down
7 changes: 2 additions & 5 deletions linkerd/app/outbound/src/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ impl Outbound<svc::BoxNewHttp<http::Endpoint>> {

let http_endpoint = self.into_stack();

let detect_http = config.proxy.detect_http();
let Config {
allow_discovery,
proxy:
ProxyConfig {
server: ServerConfig { h2_settings, .. },
dispatch_timeout,
max_in_flight_requests,
detect_protocol_timeout,
buffer_capacity,
cache_max_idle_age,
..
Expand Down Expand Up @@ -219,10 +219,7 @@ impl Outbound<svc::BoxNewHttp<http::Endpoint>> {
.push_cache(cache_max_idle_age)
.push_map_target(detect::allow_timeout)
.push(svc::BoxNewService::layer())
.push(detect::NewDetectService::layer(
detect_protocol_timeout,
http::DetectHttp::default(),
))
.push(detect::NewDetectService::layer(detect_http))
.push(rt.metrics.transport.layer_accept())
.instrument(|a: &tcp::Accept| info_span!("ingress", orig_dst = %a.orig_dst))
.push_map_target(|a: T| {
Expand Down
90 changes: 55 additions & 35 deletions linkerd/detect/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use bytes::BytesMut;
use linkerd_error::Error;
use linkerd_io as io;
use linkerd_stack::{layer, NewService};
use linkerd_stack::{layer, ExtractParam, NewService};
use std::{
fmt,
future::Future,
Expand All @@ -27,28 +27,30 @@ pub trait Detect<I>: Clone + Send + Sync + 'static {
pub type DetectResult<P> = Result<Option<P>, DetectTimeoutError<P>>;

#[derive(Error)]
#[error("{} protocol detection timed out after {:?}", std::any::type_name::<P>(), .0)]
#[error("{} protocol detection timed out after {0:?}", std::any::type_name::<P>())]
pub struct DetectTimeoutError<P>(time::Duration, std::marker::PhantomData<P>);

#[derive(Copy, Clone, Debug)]
pub struct NewDetectService<D, N> {
pub struct Config<D> {
pub detect: D,
pub capacity: usize,
pub timeout: time::Duration,
}

#[derive(Copy, Clone, Debug)]
pub struct NewDetectService<P, D, N> {
inner: N,
detect: D,
capacity: usize,
timeout: time::Duration,
params: P,
_detect: std::marker::PhantomData<fn() -> D>,
}

#[derive(Copy, Clone, Debug)]
pub struct DetectService<T, D, N> {
target: T,
config: Config<D>,
inner: N,
detect: D,
capacity: usize,
timeout: time::Duration,
}

const BUFFER_CAPACITY: usize = 1024;

pub fn allow_timeout<P, T>((p, t): (DetectResult<P>, T)) -> (Option<P>, T) {
match p {
Ok(p) => (p, t),
Expand All @@ -59,52 +61,67 @@ pub fn allow_timeout<P, T>((p, t): (DetectResult<P>, T)) -> (Option<P>, T) {
}
}

// === impl Config ===

impl<D: Default> Config<D> {
const DEFAULT_CAPACITY: usize = 1024;

pub fn from_timeout(timeout: time::Duration) -> Self {
Self {
detect: D::default(),
capacity: Self::DEFAULT_CAPACITY,
timeout,
}
}
}

// === impl NewDetectService ===

impl<D: Clone, N> NewDetectService<D, N> {
pub fn new(timeout: time::Duration, detect: D, inner: N) -> Self {
impl<P, D, N> NewDetectService<P, D, N> {
pub fn new(params: P, inner: N) -> Self {
Self {
detect,
inner,
timeout,
capacity: BUFFER_CAPACITY,
params,
_detect: std::marker::PhantomData,
}
}

pub fn layer(
timeout: time::Duration,
detect: D,
) -> impl layer::Layer<N, Service = Self> + Clone {
layer::mk(move |inner| Self::new(timeout, detect.clone(), inner))
pub fn layer(params: P) -> impl layer::Layer<N, Service = Self> + Clone
where
P: Clone,
{
layer::mk(move |inner| Self::new(params.clone(), inner))
}
}

impl<D: Clone, N: Clone, T> NewService<T> for NewDetectService<D, N> {
impl<T, P, D, N: Clone> NewService<T> for NewDetectService<P, D, N>
where
P: ExtractParam<Config<D>, T>,
{
type Service = DetectService<T, D, N>;

fn new_service(&mut self, target: T) -> DetectService<T, D, N> {
let config = self.params.extract_param(&target);
DetectService {
target,
detect: self.detect.clone(),
config,
inner: self.inner.clone(),
capacity: self.capacity,
timeout: self.timeout,
}
}
}

// === impl DetectService ===

impl<S, T, D, N, I> tower::Service<I> for DetectService<T, D, N>
impl<I, T, D, N, NSvc> tower::Service<I> for DetectService<T, D, N>
where
T: Clone + Send + 'static,
I: Send + 'static,
D: Detect<I>,
D::Protocol: std::fmt::Debug,
N: NewService<(DetectResult<D::Protocol>, T), Service = S> + Clone + Send + 'static,
S: tower::Service<io::PrefixedIo<I>, Response = ()> + Send,
S::Error: Into<Error>,
S::Future: Send,
N: NewService<(DetectResult<D::Protocol>, T), Service = NSvc> + Clone + Send + 'static,
NSvc: tower::Service<io::PrefixedIo<I>, Response = ()> + Send,
NSvc::Error: Into<Error>,
NSvc::Future: Send,
{
type Response = ();
type Error = Error;
Expand All @@ -116,15 +133,18 @@ where
}

fn call(&mut self, mut io: I) -> Self::Future {
let mut inner = self.inner.clone();
let mut buf = BytesMut::with_capacity(self.capacity);
let detect = self.detect.clone();
let Config {
detect,
capacity,
timeout,
} = self.config.clone();
let target = self.target.clone();
let timeout = self.timeout;
let mut inner = self.inner.clone();
Box::pin(async move {
trace!("Starting protocol detection");
trace!(%capacity, ?timeout, "Starting protocol detection");
let t0 = time::Instant::now();

let mut buf = BytesMut::with_capacity(capacity);
let detected = match time::timeout(timeout, detect.detect(&mut io, &mut buf)).await {
Ok(Ok(protocol)) => {
debug!(?protocol, elapsed = ?t0.elapsed(), "DetectResult");
Expand Down
16 changes: 3 additions & 13 deletions linkerd/stack/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,10 @@ impl<T: ToOwned> Param<T::Owned> for T {

/// === ExtractParam ===
impl<P, T: Param<P>> ExtractParam<P, T> for () {
impl<P: ToOwned, T> ExtractParam<P::Owned, T> for P {
#[inline]
fn extract_param(&self, t: &T) -> P {
t.param()
}
}

impl<F, P, T> ExtractParam<P, T> for F
where
F: Fn(&T) -> P,
{
#[inline]
fn extract_param(&self, target: &T) -> P {
(self)(target)
fn extract_param(&self, _: &T) -> P::Owned {
self.to_owned()
}
}

Expand Down

0 comments on commit 79de26d

Please sign in to comment.