Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

profiles: Avoid creating a default route stack #1223

Merged
merged 6 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,6 @@ dependencies = [
"linkerd-error",
"linkerd-stack",
"pin-project",
"tower",
]

[[package]]
Expand Down Expand Up @@ -1297,6 +1296,7 @@ dependencies = [
"linkerd-addr",
"linkerd-dns-name",
"linkerd-error",
"linkerd-http-box",
"linkerd-proxy-api-resolve",
"linkerd-stack",
"linkerd-tonic-watch",
Expand Down
12 changes: 6 additions & 6 deletions linkerd/app/inbound/src/http/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,12 @@ impl<C> Inbound<C> {
// request has not been received for `cache_max_idle_age`.
http.clone()
.check_new_service::<Logical, http::Request<http::BoxBody>>()
.push_on_response(http::BoxRequest::layer())
// The HTTP stack doesn't use the profile resolution, so drop it.
.push_map_target(Logical::from)
.push_on_response(http::BoxResponse::layer())
.push(profiles::http::route_request::layer(
svc::proxies()
// Sets the route as a request extension so that it can be used
// by tap.
.push_http_insert_target::<dst::Route>()
.push_on_response(http::BoxRequest::layer())
// Records per-route metrics.
.push(
rt.metrics
Expand All @@ -132,17 +130,19 @@ impl<C> Inbound<C> {
// Sets the per-route response classifier as a request
// extension.
.push(classify::NewClassify::layer())
.check_new_clone::<dst::Route>()
// Sets the route as a request extension so that it can be used
// by tap.
.push_http_insert_target::<dst::Route>()
.push_map_target(|(route, logical): (profiles::http::Route, Profile)| {
dst::Route {
route,
addr: logical.addr,
direction: metrics::Direction::In,
}
})
.push_on_response(http::BoxResponse::layer())
.into_inner(),
))
.push_on_response(http::BoxResponse::layer())
.push_switch(
// If the profile was resolved to a logical (service) address, build a profile
// stack to include route-level metrics, etc. Otherwise, skip this stack and use
Expand Down
4 changes: 4 additions & 0 deletions linkerd/app/outbound/src/http/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,11 @@ impl<E> Outbound<E> {
.push_spawn_buffer(buffer_capacity),
)
.push_cache(cache_max_idle_age)
.push_on_response(http::BoxResponse::layer())
// Note: routes can't exert backpressure.
.push(profiles::http::route_request::layer(
svc::proxies()
.push_on_response(http::BoxRequest::layer())
.push(
rt.metrics
.http_route_actual
Expand All @@ -133,8 +135,10 @@ impl<E> Outbound<E> {
// extension.
.push(classify::NewClassify::layer())
.push_map_target(Logical::mk_route)
.push_on_response(http::BoxResponse::layer())
.into_inner(),
))
.push_on_response(http::BoxRequest::layer())
// Strips headers that may be set by this proxy and add an outbound
// canonical-dst-header. The response body is boxed unify the profile
// stack's response type with that of to endpoint stack.
Expand Down
1 change: 0 additions & 1 deletion linkerd/http-box/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,4 @@ http = "0.2"
http-body = "0.4"
linkerd-error = { path = "../error" }
linkerd-stack = { path = "../stack" }
tower = {version = "0.4", default-features = false }
pin-project = "1"
10 changes: 5 additions & 5 deletions linkerd/http-box/src/erase_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

use crate::BoxBody;
use linkerd_error::Error;
use linkerd_stack::{layer, Proxy};
use linkerd_stack::{layer, Proxy, Service};
use std::task::{Context, Poll};

/// Boxes request bodies, erasing the original type.
///
/// This is *very* similar to the [`BoxRequest`](crate::request::BoxRequest)
/// middleware. However, that middleware is generic over a specific body type
/// that is erased. A given instance of `EraseRequest` can only erase the type
/// that is erased. A given instance of `BoxRequest` can only erase the type
/// of one particular `Body` type, while this middleware will erase bodies of
/// *any* type.
///
Expand Down Expand Up @@ -43,12 +43,12 @@ impl<S: Clone> Clone for EraseRequest<S> {
}
}

impl<S, B> tower::Service<http::Request<B>> for EraseRequest<S>
impl<S, B> Service<http::Request<B>> for EraseRequest<S>
where
B: http_body::Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Error>,
S: tower::Service<http::Request<BoxBody>>,
S: Service<http::Request<BoxBody>>,
{
type Response = S::Response;
type Error = S::Error;
Expand All @@ -70,7 +70,7 @@ where
B: http_body::Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Error>,
S: tower::Service<P::Request>,
S: Service<P::Request>,
P: Proxy<http::Request<BoxBody>, S>,
{
type Request = P::Request;
Expand Down
31 changes: 25 additions & 6 deletions linkerd/http-box/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@

use crate::{erase_request::EraseRequest, BoxBody};
use linkerd_error::Error;
use linkerd_stack::layer;
use linkerd_stack::{layer, Proxy, Service};
use std::{
marker::PhantomData,
task::{Context, Poll},
};

#[derive(Debug)]
pub struct BoxRequest<S, B>(S, PhantomData<fn(B)>);
pub struct BoxRequest<B, S>(S, PhantomData<fn(B)>);

impl<S, B> BoxRequest<S, B> {
impl<B, S> BoxRequest<B, S> {
pub fn layer() -> impl layer::Layer<S, Service = Self> + Clone + Copy {
layer::mk(|inner| BoxRequest(inner, PhantomData))
}
Expand All @@ -24,18 +24,18 @@ impl<S> BoxRequest<S, ()> {
}
}

impl<S: Clone, B> Clone for BoxRequest<S, B> {
impl<B, S: Clone> Clone for BoxRequest<B, S> {
fn clone(&self) -> Self {
BoxRequest(self.0.clone(), self.1)
}
}

impl<S, B> tower::Service<http::Request<B>> for BoxRequest<S, B>
impl<B, S> Service<http::Request<B>> for BoxRequest<B, S>
where
B: http_body::Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Error>,
S: tower::Service<http::Request<BoxBody>>,
S: Service<http::Request<BoxBody>>,
{
type Response = S::Response;
type Error = S::Error;
Expand All @@ -51,3 +51,22 @@ where
self.0.call(req.map(BoxBody::new))
}
}

impl<B, S, P> Proxy<http::Request<B>, S> for BoxRequest<B, P>
where
B: http_body::Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Error>,
S: Service<P::Request>,
P: Proxy<http::Request<BoxBody>, S>,
{
type Request = P::Request;
type Response = P::Response;
type Error = P::Error;
type Future = P::Future;

#[inline]
fn proxy(&self, inner: &mut S, req: http::Request<B>) -> Self::Future {
self.0.proxy(inner, req.map(BoxBody::new))
}
}
25 changes: 22 additions & 3 deletions linkerd/http-box/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::BoxBody;
use futures::{future, TryFutureExt};
use linkerd_error::Error;
use linkerd_stack::layer;
use linkerd_stack::{layer, Proxy, Service};
use std::task::{Context, Poll};

#[derive(Clone, Debug)]
Expand All @@ -15,9 +15,9 @@ impl<S> BoxResponse<S> {
}
}

impl<S, Req, B> tower::Service<Req> for BoxResponse<S>
impl<S, Req, B> Service<Req> for BoxResponse<S>
where
S: tower::Service<Req, Response = http::Response<B>>,
S: Service<Req, Response = http::Response<B>>,
B: http_body::Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Error> + 'static,
Expand All @@ -36,3 +36,22 @@ where
self.0.call(req).map_ok(|rsp| rsp.map(BoxBody::new))
}
}

impl<Req, B, S, P> Proxy<Req, S> for BoxResponse<P>
where
B: http_body::Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Error>,
S: Service<P::Request>,
P: Proxy<Req, S, Response = http::Response<B>>,
{
type Request = P::Request;
type Response = http::Response<BoxBody>;
type Error = P::Error;
type Future = future::MapOk<P::Future, fn(P::Response) -> Self::Response>;

#[inline]
fn proxy(&self, inner: &mut S, req: Req) -> Self::Future {
self.0.proxy(inner, req).map_ok(|rsp| rsp.map(BoxBody::new))
}
}
3 changes: 2 additions & 1 deletion linkerd/service-profiles/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ indexmap = "1.7"
linkerd-addr = { path = "../addr" }
linkerd-dns-name = { path = "../dns/name" }
linkerd-error = { path = "../error" }
linkerd2-proxy-api = { version = "0.2", features = ["destination", "client"] }
linkerd-http-box = { path = "../http-box" }
linkerd-proxy-api-resolve = { path = "../proxy/api-resolve" }
linkerd-stack = { path = "../stack" }
linkerd-tonic-watch = { path = "../tonic-watch" }
linkerd2-proxy-api = { version = "0.2", features = ["destination", "client"] }
rand = { version = "0.8", features = ["small_rng"] }
regex = "1.5.4"
tokio = { version = "1", features = ["macros", "rt", "sync", "time"] }
Expand Down
51 changes: 29 additions & 22 deletions linkerd/service-profiles/src/http/route_request.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use super::{RequestMatch, Route};
use crate::{Profile, Receiver, ReceiverStream};
use futures::{future::ErrInto, prelude::*, ready};
use futures::{prelude::*, ready};
use linkerd_error::Error;
use linkerd_http_box::BoxBody;
use linkerd_stack::{layer, NewService, Param, Proxy};
use std::{
collections::HashMap,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use tracing::{debug, trace};
Expand All @@ -15,19 +17,16 @@ pub fn layer<M, N: Clone, R>(
) -> impl layer::Layer<M, Service = NewRouteRequest<M, N, R>> {
// This is saved so that the same `Arc`s are used and cloned instead of
// calling `Route::default()` every time.
let default = Route::default();
layer::mk(move |inner| NewRouteRequest {
inner,
new_route: new_route.clone(),
default: default.clone(),
_route: PhantomData,
})
}

pub struct NewRouteRequest<M, N, R> {
inner: M,
new_route: N,
default: Route,
_route: PhantomData<R>,
}

Expand All @@ -38,15 +37,13 @@ pub struct RouteRequest<T, S, N, R> {
new_route: N,
http_routes: Vec<(RequestMatch, Route)>,
proxies: HashMap<Route, R>,
default: R,
}

impl<M: Clone, N: Clone, R> Clone for NewRouteRequest<M, N, R> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
new_route: self.new_route.clone(),
default: self.default.clone(),
_route: self._route,
}
}
Expand All @@ -63,33 +60,41 @@ where
fn new_service(&mut self, target: T) -> Self::Service {
let rx = target.param();
let inner = self.inner.new_service(target.clone());
let default = self
.new_route
.new_service((self.default.clone(), target.clone()));
RouteRequest {
rx: rx.into(),
target,
inner,
default,
new_route: self.new_route.clone(),
http_routes: Vec::new(),
proxies: HashMap::new(),
}
}
}

impl<B, T, N, S, R> tower::Service<http::Request<B>> for RouteRequest<T, S, N, R>
impl<T, N, S, R> tower::Service<http::Request<BoxBody>> for RouteRequest<T, S, N, R>
where
B: Send + 'static,
T: Clone,
N: NewService<(Route, T), Service = R> + Clone,
R: Proxy<http::Request<B>, S>,
S: tower::Service<R::Request>,
R: Proxy<
http::Request<BoxBody>,
S,
Request = http::Request<BoxBody>,
Response = http::Response<BoxBody>,
>,
R::Future: Send + 'static,
S: tower::Service<http::Request<BoxBody>, Response = http::Response<BoxBody>>,
S::Future: Send + 'static,
S::Error: Into<Error>,
{
type Response = R::Response;
type Response = http::Response<BoxBody>;
type Error = Error;
type Future = ErrInto<R::Future, Error>;
type Future = Pin<
Box<
dyn std::future::Future<Output = Result<http::Response<BoxBody>, Error>>
+ Send
+ 'static,
>,
>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut update = None;
Expand Down Expand Up @@ -119,17 +124,19 @@ where
Poll::Ready(ready!(self.inner.poll_ready(cx)).map_err(Into::into))
}

fn call(&mut self, req: http::Request<B>) -> Self::Future {
fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
for (ref condition, ref route) in &self.http_routes {
if condition.is_match(&req) {
trace!(?condition, "Using configured route");
return self.proxies[route]
.proxy(&mut self.inner, req)
.err_into::<Error>();
return Box::pin(
self.proxies[route]
.proxy(&mut self.inner, req)
.err_into::<Error>(),
);
}
}

trace!("Using default route");
self.default.proxy(&mut self.inner, req).err_into::<Error>()
trace!("No routes matched");
Box::pin(self.inner.call(req).err_into::<Error>())
olix0r marked this conversation as resolved.
Show resolved Hide resolved
}
}