diff --git a/Cargo.lock b/Cargo.lock index 4e566af4a9..51ee6c9676 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -668,7 +668,6 @@ dependencies = [ "linkerd-duplex", "linkerd-errno", "linkerd-error", - "linkerd-error-metrics", "linkerd-error-respond", "linkerd-exp-backoff", "linkerd-http-classify", @@ -750,6 +749,7 @@ dependencies = [ "linkerd-tonic-watch", "linkerd-tracing", "linkerd2-proxy-api", + "parking_lot", "thiserror", "tokio", "tokio-test", @@ -801,6 +801,7 @@ dependencies = [ "linkerd-identity", "linkerd-io", "linkerd-tracing", + "parking_lot", "pin-project", "thiserror", "tokio", @@ -910,17 +911,6 @@ dependencies = [ "futures", ] -[[package]] -name = "linkerd-error-metrics" -version = "0.1.0" -dependencies = [ - "futures", - "linkerd-metrics", - "parking_lot", - "pin-project", - "tower", -] - [[package]] name = "linkerd-error-respond" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 28f38c61a7..22571ac9be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,6 @@ members = [ "linkerd/duplex", "linkerd/error", "linkerd/errno", - "linkerd/error-metrics", "linkerd/error-respond", "linkerd/exp-backoff", "linkerd/http-box", diff --git a/linkerd/app/admin/src/stack.rs b/linkerd/app/admin/src/stack.rs index ab5663d21a..2c435f1431 100644 --- a/linkerd/app/admin/src/stack.rs +++ b/linkerd/app/admin/src/stack.rs @@ -7,9 +7,10 @@ use linkerd_app_core::{ serve, svc::{self, ExtractParam, InsertParam, Param}, tls, trace, - transport::{self, listen::Bind, ClientAddr, Local, Remote, ServerAddr}, - Error, + transport::{self, listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, + Error, Result, }; +use linkerd_app_inbound as inbound; use std::{pin::Pin, time::Duration}; use thiserror::Error; use tokio::sync::mpsc; @@ -64,7 +65,7 @@ impl Config { bind: B, identity: Option, report: R, - metrics: metrics::Proxy, + metrics: inbound::Metrics, trace: trace::Handle, drain: drain::Watch, shutdown: mpsc::UnboundedSender<()>, @@ -79,11 +80,11 @@ impl Config { let (ready, latch) = crate::server::Readiness::new(); let admin = crate::server::Admin::new(report, ready, shutdown, trace); let admin = svc::stack(move |_| admin.clone()) - .push(metrics.http_endpoint.to_layer::()) + .push(metrics.proxy.http_endpoint.to_layer::()) + .push(metrics.http_errors.to_layer()) .push_on_service( svc::layers() - .push(metrics.http_errors.clone()) - .push(errors::layer()) + .push(errors::respond::layer()) .push(http::BoxResponse::layer()), ) .push(http::NewServeHttp::layer(Default::default(), drain.clone())) @@ -130,8 +131,10 @@ impl Config { ) .push(svc::BoxNewService::layer()) .push(detect::NewDetectService::layer(detect::Config::::from_timeout(DETECT_TIMEOUT))) - .push(transport::metrics::NewServer::layer(metrics.transport)) - .push_map_target(|(tls, addrs): (tls::ConditionalServerTls, B::Addrs)| { + .push(transport::metrics::NewServer::layer(metrics.proxy.transport)) + .push_map_target(move |(tls, addrs): (tls::ConditionalServerTls, B::Addrs)| { + // TODO(ver): We should enforce policy here; but we need to permit liveness probes + // for destination pods to startup... Tcp { tls, client: addrs.param(), @@ -161,8 +164,7 @@ impl Param for Tcp { self.tls.clone(), self.addr.into(), // TODO(ver) enforce policies on the proxy's admin port. - Default::default(), - Default::default(), + metrics::ServerLabel("default:admin".to_string()), ) } } @@ -175,13 +177,28 @@ impl Param for Http { } } +impl Param for Http { + fn param(&self) -> OrigDstAddr { + OrigDstAddr(self.tcp.addr.into()) + } +} + +impl Param for Http { + fn param(&self) -> metrics::ServerLabel { + metrics::ServerLabel("default:admin".to_string()) + } +} + impl Param for Http { fn param(&self) -> metrics::EndpointLabels { metrics::InboundEndpointLabels { tls: self.tcp.tls.clone(), authority: None, target_addr: self.tcp.addr.into(), - policy: Default::default(), + policy: metrics::AuthzLabels { + server: self.param(), + authz: "default:all-unauthenticated".to_string(), + }, } .into() } diff --git a/linkerd/app/core/Cargo.toml b/linkerd/app/core/Cargo.toml index b6bf3c59a9..c537c57773 100644 --- a/linkerd/app/core/Cargo.toml +++ b/linkerd/app/core/Cargo.toml @@ -28,7 +28,6 @@ linkerd-detect = { path = "../../detect" } linkerd-duplex = { path = "../../duplex" } linkerd-errno = { path = "../../errno" } linkerd-error = { path = "../../error" } -linkerd-error-metrics = { path = "../../error-metrics" } linkerd-error-respond = { path = "../../error-respond" } linkerd-exp-backoff = { path = "../../exp-backoff" } linkerd-http-classify = { path = "../../http-classify" } diff --git a/linkerd/app/core/src/control.rs b/linkerd/app/core/src/control.rs index 131ec29c2a..4e5cfb6e8c 100644 --- a/linkerd/app/core/src/control.rs +++ b/linkerd/app/core/src/control.rs @@ -79,7 +79,7 @@ impl Config { svc::stack(ConnectTcp::new(self.connect.keepalive)) .push(tls::Client::layer(identity)) - .push_timeout(self.connect.timeout) + .push_connect_timeout(self.connect.timeout) .push(self::client::layer()) .push_on_service(svc::MapErrLayer::new(Into::into)) .into_new_service() diff --git a/linkerd/app/core/src/errors/mod.rs b/linkerd/app/core/src/errors/mod.rs new file mode 100644 index 0000000000..66fd853064 --- /dev/null +++ b/linkerd/app/core/src/errors/mod.rs @@ -0,0 +1,44 @@ +pub mod respond; + +use linkerd_error::Error; +pub use linkerd_timeout::{FailFastError, ResponseTimeout}; +use thiserror::Error; + +#[derive(Debug, Error)] +#[error("connect timed out after {0:?}")] +pub(crate) struct ConnectTimeout(pub std::time::Duration); + +#[derive(Debug, Error)] +#[error("{source}")] +pub struct HttpError { + #[source] + source: Error, + http_status: http::StatusCode, + grpc_status: tonic::Code, +} + +impl HttpError { + pub fn bad_request(source: impl Into) -> Self { + Self { + source: source.into(), + http_status: http::StatusCode::BAD_REQUEST, + grpc_status: tonic::Code::InvalidArgument, + } + } + + pub fn forbidden(source: impl Into) -> Self { + Self { + source: source.into(), + http_status: http::StatusCode::FORBIDDEN, + grpc_status: tonic::Code::PermissionDenied, + } + } + + pub fn loop_detected(source: impl Into) -> Self { + Self { + source: source.into(), + http_status: http::StatusCode::LOOP_DETECTED, + grpc_status: tonic::Code::Aborted, + } + } +} diff --git a/linkerd/app/core/src/errors.rs b/linkerd/app/core/src/errors/respond.rs similarity index 59% rename from linkerd/app/core/src/errors.rs rename to linkerd/app/core/src/errors/respond.rs index a2b2d97aaf..443440241f 100644 --- a/linkerd/app/core/src/errors.rs +++ b/linkerd/app/core/src/errors/respond.rs @@ -1,72 +1,23 @@ -use crate::transport::DeniedUnauthorized; +use super::{ConnectTimeout, HttpError}; use http::{header::HeaderValue, StatusCode}; -use linkerd_errno::Errno; use linkerd_error::Error; -use linkerd_error_metrics::{self as error_metrics, RecordErrorLayer, Registry}; use linkerd_error_respond as respond; -pub use linkerd_error_respond::RespondLayer; -use linkerd_metrics::{metrics, Counter, FmtLabels, FmtMetrics}; use linkerd_proxy_http::{ClientHandle, HasH2Reason}; use linkerd_timeout::{FailFastError, ResponseTimeout}; -use linkerd_tls as tls; use pin_project::pin_project; -use std::fmt; -use std::pin::Pin; -use std::task::{Context, Poll}; -use thiserror::Error; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; use tonic::{self as grpc, Code}; use tracing::{debug, warn}; pub const L5D_PROXY_ERROR: &str = "l5d-proxy-error"; -metrics! { - inbound_http_errors_total: Counter { - "The total number of inbound HTTP requests that could not be processed due to a proxy error." - }, - - outbound_http_errors_total: Counter { - "The total number of outbound HTTP requests that could not be processed due to a proxy error." - } -} - pub fn layer() -> respond::RespondLayer { respond::RespondLayer::new(NewRespond(())) } -#[derive(Clone)] -pub struct Metrics { - inbound: Registry, - outbound: Registry, -} - -pub type MetricsLayer = RecordErrorLayer; - -/// Error metric labels. -#[derive(Copy, Clone, Debug)] -pub struct LabelError(()); - -#[derive(Copy, Clone, Debug, Error)] -#[error("{}", self.message)] -pub struct HttpError { - http: StatusCode, - grpc: Code, - message: &'static str, - reason: Reason, -} - -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] -pub enum Reason { - DispatchTimeout, - ResponseTimeout, - IdentityRequired, - Io(Option), - FailFast, - GatewayLoop, - NotFound, - Unauthorized, - Unexpected, -} - #[derive(Copy, Clone, Debug)] pub struct NewRespond(()); @@ -89,70 +40,7 @@ pub enum ResponseBody { const GRPC_CONTENT_TYPE: &str = "application/grpc"; -impl hyper::body::HttpBody for ResponseBody -where - B::Error: Into, -{ - type Data = B::Data; - type Error = B::Error; - - fn poll_data( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - match self.project() { - ResponseBodyProj::NonGrpc(inner) => inner.poll_data(cx), - ResponseBodyProj::Grpc { inner, trailers } => { - // should not be calling poll_data if we have set trailers derived from an error - assert!(trailers.is_none()); - match inner.poll_data(cx) { - Poll::Ready(Some(Err(error))) => { - let error = error.into(); - let mut error_trailers = http::HeaderMap::new(); - let code = set_grpc_status(&*error, &mut error_trailers); - debug!(%error, grpc.status = ?code, "Handling gRPC stream failure"); - *trailers = Some(error_trailers); - Poll::Ready(None) - } - data => data, - } - } - } - } - - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - match self.project() { - ResponseBodyProj::NonGrpc(inner) => inner.poll_trailers(cx), - ResponseBodyProj::Grpc { inner, trailers } => match trailers.take() { - Some(t) => Poll::Ready(Ok(Some(t))), - None => inner.poll_trailers(cx), - }, - } - } - - fn is_end_stream(&self) -> bool { - match self { - Self::NonGrpc(inner) => inner.is_end_stream(), - Self::Grpc { inner, trailers } => trailers.is_none() && inner.is_end_stream(), - } - } - - fn size_hint(&self) -> http_body::SizeHint { - match self { - Self::NonGrpc(inner) => inner.size_hint(), - Self::Grpc { inner, .. } => inner.size_hint(), - } - } -} - -impl Default for ResponseBody { - fn default() -> ResponseBody { - ResponseBody::NonGrpc(B::default()) - } -} +// === impl NewRespond === impl respond::NewRespond, http::Response> for NewRespond @@ -186,6 +74,8 @@ impl } } +// === impl Respond === + impl respond::Respond> for Respond { type Response = http::Response>; @@ -251,13 +141,80 @@ impl respond::Respond hyper::body::HttpBody for ResponseBody +where + B::Error: Into, +{ + type Data = B::Data; + type Error = B::Error; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + match self.project() { + ResponseBodyProj::NonGrpc(inner) => inner.poll_data(cx), + ResponseBodyProj::Grpc { inner, trailers } => { + // should not be calling poll_data if we have set trailers derived from an error + assert!(trailers.is_none()); + match inner.poll_data(cx) { + Poll::Ready(Some(Err(error))) => { + let error = error.into(); + let mut error_trailers = http::HeaderMap::new(); + let code = set_grpc_status(&*error, &mut error_trailers); + debug!(%error, grpc.status = ?code, "Handling gRPC stream failure"); + *trailers = Some(error_trailers); + Poll::Ready(None) + } + data => data, + } + } + } + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + match self.project() { + ResponseBodyProj::NonGrpc(inner) => inner.poll_trailers(cx), + ResponseBodyProj::Grpc { inner, trailers } => match trailers.take() { + Some(t) => Poll::Ready(Ok(Some(t))), + None => inner.poll_trailers(cx), + }, + } + } + + fn is_end_stream(&self) -> bool { + match self { + Self::NonGrpc(inner) => inner.is_end_stream(), + Self::Grpc { inner, trailers } => trailers.is_none() && inner.is_end_stream(), + } + } + + fn size_hint(&self) -> http_body::SizeHint { + match self { + Self::NonGrpc(inner) => inner.size_hint(), + Self::Grpc { inner, .. } => inner.size_hint(), + } + } +} + +impl Default for ResponseBody { + fn default() -> ResponseBody { + ResponseBody::NonGrpc(B::default()) + } +} + +// === helpers === + fn set_l5d_proxy_error_header( - mut builder: http::response::Builder, + builder: http::response::Builder, error: &(dyn std::error::Error + 'static), ) -> http::response::Builder { - if let Some(HttpError { message, .. }) = error.downcast_ref::() { - builder.header(L5D_PROXY_ERROR, HeaderValue::from_static(message)) - } else if error.is::() { + if error.is::() { builder.header( L5D_PROXY_ERROR, HeaderValue::from_static("request timed out"), @@ -275,16 +232,12 @@ fn set_l5d_proxy_error_header( HeaderValue::from_static("service in fail-fast") }), ) - } else if error.is::() { + } else if let Some(HttpError { source, .. }) = error.downcast_ref() { builder.header( L5D_PROXY_ERROR, - HeaderValue::from_static("proxy dispatch timed out"), + HeaderValue::from_str(&source.to_string()) + .unwrap_or_else(|_| HeaderValue::from_static("an error occurred")), ) - } else if error.is::() { - if let Ok(msg) = HeaderValue::from_str(&error.to_string()) { - builder = builder.header(L5D_PROXY_ERROR, msg) - } - builder } else if let Some(source) = error.source() { set_l5d_proxy_error_header(builder, source) } else { @@ -299,18 +252,12 @@ fn set_http_status( builder: http::response::Builder, error: &(dyn std::error::Error + 'static), ) -> http::response::Builder { - if let Some(HttpError { http, .. }) = error.downcast_ref::() { - builder.status(*http) - } else if error.is::() { - builder.status(StatusCode::GATEWAY_TIMEOUT) - } else if error.is::() { + if error.is::() || error.is::() { builder.status(StatusCode::GATEWAY_TIMEOUT) } else if error.is::() { builder.status(StatusCode::SERVICE_UNAVAILABLE) - } else if error.is::() { - builder.status(StatusCode::SERVICE_UNAVAILABLE) - } else if error.is::() || error.is::() { - builder.status(StatusCode::FORBIDDEN) + } else if let Some(HttpError { http_status, .. }) = error.downcast_ref() { + builder.status(http_status) } else if let Some(source) = error.source() { set_http_status(builder, source) } else { @@ -325,11 +272,7 @@ fn set_grpc_status( const GRPC_STATUS: &str = "grpc-status"; const GRPC_MESSAGE: &str = "grpc-message"; - if let Some(HttpError { grpc, message, .. }) = error.downcast_ref::() { - headers.insert(GRPC_STATUS, code_header(*grpc)); - headers.insert(GRPC_MESSAGE, HeaderValue::from_static(message)); - *grpc - } else if error.is::() { + if error.is::() { let code = Code::DeadlineExceeded; headers.insert(GRPC_STATUS, code_header(code)); headers.insert(GRPC_MESSAGE, HeaderValue::from_static("request timed out")); @@ -345,33 +288,22 @@ fn set_grpc_status( }), ); code - } else if error.is::() { - let code = Code::Unavailable; - headers.insert(GRPC_STATUS, code_header(code)); - headers.insert( - GRPC_MESSAGE, - HeaderValue::from_static("proxy dispatch timed out"), - ); - code - } else if error.is::() { - let code = Code::PermissionDenied; - headers.insert(GRPC_STATUS, code_header(code)); - if let Ok(msg) = HeaderValue::from_str(&error.to_string()) { - headers.insert(GRPC_MESSAGE, msg); - } - code - } else if error.is::() { - let code = Code::FailedPrecondition; - headers.insert(GRPC_STATUS, code_header(code)); - if let Ok(msg) = HeaderValue::from_str(&error.to_string()) { - headers.insert(GRPC_MESSAGE, msg); - } - code } else if error.is::() { let code = Code::Unavailable; headers.insert(GRPC_STATUS, code_header(code)); headers.insert(GRPC_MESSAGE, HeaderValue::from_static("connection closed")); code + } else if let Some(HttpError { + source, + grpc_status, + .. + }) = error.downcast_ref() + { + headers.insert(GRPC_STATUS, code_header(*grpc_status)); + if let Ok(v) = HeaderValue::from_str(&*source.to_string()) { + headers.insert(GRPC_MESSAGE, v); + } + *grpc_status } else if let Some(source) = error.source() { set_grpc_status(source, headers) } else { @@ -406,153 +338,3 @@ fn code_header(code: grpc::Code) -> HeaderValue { Code::Unauthenticated => HeaderValue::from_static("16"), } } - -#[derive(Debug)] -pub struct IdentityRequired { - pub required: tls::client::ServerId, - pub found: Option, -} - -impl fmt::Display for IdentityRequired { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self.found { - Some(ref found) => write!( - f, - "request required the identity '{}' but '{}' found", - self.required, found - ), - None => write!( - f, - "request required the identity '{}' but no identity found", - self.required - ), - } - } -} - -impl std::error::Error for IdentityRequired {} - -impl LabelError { - fn reason(err: &(dyn std::error::Error + 'static)) -> Reason { - if let Some(HttpError { reason, .. }) = err.downcast_ref::() { - *reason - } else if err.is::() { - Reason::ResponseTimeout - } else if err.is::() { - Reason::FailFast - } else if err.is::() { - Reason::DispatchTimeout - } else if err.is::() { - Reason::Unauthorized - } else if err.is::() { - Reason::IdentityRequired - } else if let Some(e) = err.downcast_ref::() { - Reason::Io(e.raw_os_error().map(Errno::from)) - } else if let Some(e) = err.source() { - Self::reason(e) - } else { - Reason::Unexpected - } - } -} - -impl error_metrics::LabelError for LabelError { - type Labels = Reason; - - fn label_error(&self, err: &Error) -> Self::Labels { - Self::reason(err.as_ref()) - } -} - -impl FmtLabels for Reason { - fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "message=\"{}\"", - match self { - Reason::FailFast => "failfast", - Reason::DispatchTimeout => "dispatch timeout", - Reason::ResponseTimeout => "response timeout", - Reason::IdentityRequired => "identity required", - Reason::GatewayLoop => "gateway loop", - Reason::NotFound => "not found", - Reason::Io(_) => "i/o", - Reason::Unauthorized => "unauthorized", - Reason::Unexpected => "unexpected", - } - )?; - - if let Reason::Io(Some(errno)) = self { - write!(f, ",errno=\"{}\"", errno)?; - } - - Ok(()) - } -} - -impl Default for Metrics { - fn default() -> Metrics { - Self { - inbound: Registry::new(inbound_http_errors_total), - outbound: Registry::new(outbound_http_errors_total), - } - } -} - -impl Metrics { - pub fn inbound(&self) -> MetricsLayer { - self.inbound.layer(LabelError(())) - } - - pub fn outbound(&self) -> MetricsLayer { - self.outbound.layer(LabelError(())) - } - - pub fn report(&self) -> impl FmtMetrics + Clone + Send { - self.inbound.clone().and_then(self.outbound.clone()) - } -} - -impl HttpError { - pub fn identity_required(message: &'static str) -> Self { - Self { - message, - http: StatusCode::FORBIDDEN, - grpc: Code::Unauthenticated, - reason: Reason::IdentityRequired, - } - } - - pub fn not_found(message: &'static str) -> Self { - Self { - message, - http: StatusCode::NOT_FOUND, - grpc: Code::NotFound, - reason: Reason::NotFound, - } - } - - pub fn gateway_loop() -> Self { - Self { - message: "gateway loop detected", - http: StatusCode::LOOP_DETECTED, - grpc: Code::Aborted, - reason: Reason::GatewayLoop, - } - } - - pub fn status(&self) -> StatusCode { - self.http - } -} - -#[derive(Debug)] -pub(crate) struct ConnectTimeout(pub std::time::Duration); - -impl fmt::Display for ConnectTimeout { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "connect timed out after {:?}", self.0) - } -} - -impl std::error::Error for ConnectTimeout {} diff --git a/linkerd/app/core/src/metrics/mod.rs b/linkerd/app/core/src/metrics/mod.rs index 7152380bef..9147b3f5c7 100644 --- a/linkerd/app/core/src/metrics/mod.rs +++ b/linkerd/app/core/src/metrics/mod.rs @@ -1,9 +1,6 @@ -mod tcp_accept_errors; - use crate::{ classify::{Class, SuccessOrFailure}, - control, dst, errors, http_metrics, http_metrics as metrics, opencensus, profiles, - stack_metrics, + control, dst, http_metrics, http_metrics as metrics, opencensus, profiles, stack_metrics, svc::Param, telemetry, tls, transport::{ @@ -12,9 +9,7 @@ use crate::{ }, }; use linkerd_addr::Addr; -use linkerd_metrics::FmtLabels; pub use linkerd_metrics::*; -use linkerd_server_policy as policy; use std::{ fmt::{self, Write}, net::SocketAddr, @@ -31,24 +26,21 @@ pub type HttpRouteRetry = http_metrics::Retries; pub type Stack = stack_metrics::Registry; +#[derive(Clone, Debug)] +pub struct Metrics { + pub proxy: Proxy, + pub control: ControlHttp, + pub opencensus: opencensus::metrics::Registry, +} + #[derive(Clone, Debug)] pub struct Proxy { pub http_route: HttpRoute, pub http_route_actual: HttpRoute, pub http_route_retry: HttpRouteRetry, pub http_endpoint: HttpEndpoint, - pub http_errors: errors::MetricsLayer, - pub stack: Stack, pub transport: transport::Metrics, - pub tcp_accept_errors: tcp_accept_errors::Registry, -} - -#[derive(Clone, Debug)] -pub struct Metrics { - pub inbound: Proxy, - pub outbound: Proxy, - pub control: ControlHttp, - pub opencensus: opencensus::metrics::Registry, + pub stack: Stack, } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -68,13 +60,18 @@ pub struct InboundEndpointLabels { pub tls: tls::ConditionalServerTls, pub authority: Option, pub target_addr: SocketAddr, - pub policy: PolicyLabels, + pub policy: AuthzLabels, } -#[derive(Clone, Debug, Default, Eq, PartialEq, Hash)] -pub struct PolicyLabels { - pub server: policy::Labels, - pub authz: policy::Labels, +/// A label referencing an inbound `Server` (i.e. for policy). +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +pub struct ServerLabel(pub String); + +/// Labels referencing an inbound `ServerAuthorization. +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +pub struct AuthzLabels { + pub server: ServerLabel, + pub authz: String, } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -162,51 +159,33 @@ impl Metrics { (m, r.without_latencies()) }; - let http_errors = errors::Metrics::default(); - let stack = stack_metrics::Registry::default(); - let (transport, transport_report) = transport::metrics::new(retain_idle); + let (transport, transport_report) = transport::Metrics::new(retain_idle); - let inbound_tcp_accept_errors = tcp_accept_errors::Registry::inbound(); - let outbound_tcp_accept_errors = tcp_accept_errors::Registry::outbound(); + let proxy = Proxy { + http_endpoint, + http_route, + http_route_retry, + http_route_actual, + stack: stack.clone(), + transport, + }; let (opencensus, opencensus_report) = opencensus::metrics::new(); let metrics = Metrics { - inbound: Proxy { - http_endpoint: http_endpoint.clone(), - http_route: http_route.clone(), - http_route_actual: http_route_actual.clone(), - http_route_retry: http_route_retry.clone(), - http_errors: http_errors.inbound(), - stack: stack.clone(), - transport: transport.clone().into(), - tcp_accept_errors: inbound_tcp_accept_errors.clone(), - }, - outbound: Proxy { - http_endpoint, - http_route, - http_route_retry, - http_route_actual, - http_errors: http_errors.outbound(), - stack: stack.clone(), - transport: transport.into(), - tcp_accept_errors: outbound_tcp_accept_errors.clone(), - }, + proxy, control, opencensus, }; - let report = (http_errors.report()) - .and_then(endpoint_report) + let report = endpoint_report .and_then(route_report) .and_then(retry_report) .and_then(actual_report) .and_then(control_report) .and_then(transport_report) - .and_then(inbound_tcp_accept_errors) - .and_then(outbound_tcp_accept_errors) .and_then(opencensus_report) .and_then(stack) .and_then(process) @@ -291,19 +270,29 @@ impl FmtLabels for InboundEndpointLabels { write!(f, ",")?; } - (TargetAddr(self.target_addr), TlsAccept::from(&self.tls)).fmt_labels(f)?; - - for (k, v) in self.policy.server.iter() { - write!(f, ",srv_{}=\"{}\"", k, v)?; - } - for (k, v) in self.policy.authz.iter() { - write!(f, ",saz_{}=\"{}\"", k, v)?; - } + ( + (TargetAddr(self.target_addr), TlsAccept::from(&self.tls)), + &self.policy, + ) + .fmt_labels(f)?; Ok(()) } } +impl FmtLabels for ServerLabel { + fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "srv_name=\"{}\"", self.0) + } +} + +impl FmtLabels for AuthzLabels { + fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.server.fmt_labels(f)?; + write!(f, ",saz_name=\"{}\"", self.authz) + } +} + impl FmtLabels for OutboundEndpointLabels { fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { if let Some(a) = self.authority.as_ref() { diff --git a/linkerd/app/core/src/metrics/tcp_accept_errors.rs b/linkerd/app/core/src/metrics/tcp_accept_errors.rs deleted file mode 100644 index 4309ed36c8..0000000000 --- a/linkerd/app/core/src/metrics/tcp_accept_errors.rs +++ /dev/null @@ -1,121 +0,0 @@ -use crate::{ - metrics::{self, Counter, FmtMetrics}, - svc, - transport::{labels, DeniedUnauthorized, DeniedUnknownPort, OrigDstAddr}, -}; -use linkerd_error::Error; -use linkerd_error_metrics::{FmtLabels, LabelError, RecordError}; -use linkerd_tls::server::ServerTlsTimeoutError; -use parking_lot::Mutex; -use std::{collections::HashMap, fmt}; - -metrics::metrics! { - inbound_tcp_accept_errors_total: Counter { - "The total number of inbound TCP connections that could not be processed due to a proxy error." - }, - - outbound_tcp_accept_errors_total: Counter { - "The total number of outbound TCP connections that could not be processed due to a proxy error." - } -} - -#[derive(Clone, Debug)] -pub struct Registry { - scopes: metrics::SharedStore, - metric: linkerd_error_metrics::Metric, -} - -type Scope = Mutex>; - -type NewErrorMetrics = - metrics::NewMetrics>; - -#[derive(Clone, Copy, Debug, Default)] -pub struct LabelAcceptErrors(()); - -#[derive(Clone, Debug, Eq, PartialEq, Hash)] -pub enum AcceptErrors { - TlsDetectTimeout, - Unauthorized, - Io, - Other, -} - -// === impl Registry === - -impl Registry { - pub fn inbound() -> Self { - Self { - metric: inbound_tcp_accept_errors_total, - scopes: Default::default(), - } - } - - pub fn outbound() -> Self { - Self { - metric: outbound_tcp_accept_errors_total, - scopes: Default::default(), - } - } - - pub fn layer>( - &self, - ) -> impl svc::Layer> + Clone { - metrics::NewMetrics::layer(self.scopes.clone()) - } -} - -impl FmtMetrics for Registry { - fn fmt_metrics(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - use metrics::FmtMetric; - let errors = self.scopes.lock(); - - self.metric.fmt_help(f)?; - for (OrigDstAddr(a), ms) in errors.iter() { - let ta = labels::TargetAddr(*a); - for (e, m) in ms.lock().iter() { - m.fmt_metric_labeled(f, self.metric.name, (ta, e))?; - } - } - - Ok(()) - } -} - -// === impl LabelAcceptErrors === - -impl LabelError for LabelAcceptErrors { - type Labels = AcceptErrors; - - fn label_error(&self, err: &Error) -> Self::Labels { - let mut curr: Option<&dyn std::error::Error> = Some(err.as_ref()); - while let Some(err) = curr { - if err.is::() { - return AcceptErrors::TlsDetectTimeout; - } else if err.is::() { - // We ignore the error code because we want all labels to be consistent. - return AcceptErrors::Io; - } else if err.is::() || err.is::() { - // If the port is unknown, the default policy is `deny`; so handle it as - // unauthorized. - return AcceptErrors::Unauthorized; - } - curr = err.source(); - } - - AcceptErrors::Other - } -} - -// === impl AcceptErrors === - -impl FmtLabels for AcceptErrors { - fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::TlsDetectTimeout => fmt::Display::fmt("error=\"tls_detect_timeout\"", f), - Self::Io => fmt::Display::fmt("error=\"io\"", f), - Self::Other => fmt::Display::fmt("error=\"other\"", f), - Self::Unauthorized => fmt::Display::fmt("error=\"unauthorized\"", f), - } - } -} diff --git a/linkerd/app/core/src/svc.rs b/linkerd/app/core/src/svc.rs index fa069032c9..d3280bf33d 100644 --- a/linkerd/app/core/src/svc.rs +++ b/linkerd/app/core/src/svc.rs @@ -175,10 +175,6 @@ impl Stack { self.push(stack::OnServiceLayer::new(layer)) } - pub fn push_timeout(self, timeout: Duration) -> Stack> { - self.push(tower::timeout::TimeoutLayer::new(timeout)) - } - /// Wraps the inner service with a response timeout such that timeout errors are surfaced as a /// `ConnectTimeout` error. /// @@ -187,7 +183,7 @@ impl Stack { self, timeout: Duration, ) -> Stack, impl FnOnce(Error) -> Error + Clone>> { - self.push_timeout(timeout) + self.push(tower::timeout::TimeoutLayer::new(timeout)) .push(MapErrLayer::new(move |err: Error| { if err.is::() { crate::errors::ConnectTimeout(timeout).into() diff --git a/linkerd/app/core/src/transport/labels.rs b/linkerd/app/core/src/transport/labels.rs index 6889d87990..7339d8fa40 100644 --- a/linkerd/app/core/src/transport/labels.rs +++ b/linkerd/app/core/src/transport/labels.rs @@ -1,7 +1,6 @@ -pub use crate::metrics::{Direction, OutboundEndpointLabels, PolicyLabels}; +pub use crate::metrics::{Direction, OutboundEndpointLabels, ServerLabel as PolicyServerLabel}; use linkerd_conditional::Conditional; use linkerd_metrics::FmtLabels; -use linkerd_server_policy as policy; use linkerd_tls as tls; use std::{fmt, net::SocketAddr}; @@ -22,7 +21,7 @@ pub struct ServerLabels { direction: Direction, tls: tls::ConditionalServerTls, target_addr: SocketAddr, - policy: Option, + policy: Option, } #[derive(Clone, Debug, Eq, PartialEq, Hash)] @@ -32,7 +31,7 @@ pub(crate) struct TlsAccept<'t>(&'t tls::ConditionalServerTls); pub(crate) struct TlsConnect<'t>(&'t tls::ConditionalClientTls); #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] -pub(crate) struct TargetAddr(pub(crate) SocketAddr); +pub struct TargetAddr(pub SocketAddr); // === impl Key === @@ -40,14 +39,9 @@ impl Key { pub fn inbound_server( tls: tls::ConditionalServerTls, target_addr: SocketAddr, - server: policy::Labels, - authz: policy::Labels, + server: PolicyServerLabel, ) -> Self { - Self::Server(ServerLabels::inbound( - tls, - target_addr, - PolicyLabels { server, authz }, - )) + Self::Server(ServerLabels::inbound(tls, target_addr, server)) } pub fn outbound_server(target_addr: SocketAddr) -> Self { @@ -82,7 +76,7 @@ impl ServerLabels { fn inbound( tls: tls::ConditionalServerTls, target_addr: SocketAddr, - policy: PolicyLabels, + policy: PolicyServerLabel, ) -> Self { ServerLabels { direction: Direction::In, @@ -106,16 +100,11 @@ impl FmtLabels for ServerLabels { fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.direction.fmt_labels(f)?; f.write_str(",peer=\"src\",")?; - (TargetAddr(self.target_addr), TlsAccept(&self.tls)).fmt_labels(f)?; - - if let Some(policy) = self.policy.as_ref() { - for (k, v) in policy.server.iter() { - write!(f, ",srv_{}=\"{}\"", k, v)?; - } - for (k, v) in policy.authz.iter() { - write!(f, ",saz_{}=\"{}\"", k, v)?; - } - } + ( + (TargetAddr(self.target_addr), TlsAccept(&self.tls)), + self.policy.as_ref(), + ) + .fmt_labels(f)?; Ok(()) } @@ -205,21 +194,14 @@ mod tests { negotiated_protocol: None, }), ([192, 0, 2, 4], 40000).into(), - PolicyLabels { - server: vec![("name".to_string(), "testserver".to_string())] - .into_iter() - .collect(), - authz: vec![("name".to_string(), "testauthz".to_string())] - .into_iter() - .collect(), - }, + PolicyServerLabel("testserver".to_string()), ); assert_eq!( labels.to_string(), "direction=\"inbound\",peer=\"src\",\ target_addr=\"192.0.2.4:40000\",target_ip=\"192.0.2.4\",target_port=\"40000\",\ tls=\"true\",client_id=\"foo.id.example.com\",\ - srv_name=\"testserver\",saz_name=\"testauthz\"" + srv_name=\"testserver\"" ); } } diff --git a/linkerd/app/core/src/transport/mod.rs b/linkerd/app/core/src/transport/mod.rs index f890e06794..e5127b476f 100644 --- a/linkerd/app/core/src/transport/mod.rs +++ b/linkerd/app/core/src/transport/mod.rs @@ -2,28 +2,16 @@ pub use linkerd_proxy_transport::*; use linkerd_stack::{ExtractParam, Param}; pub use linkerd_transport_metrics as metrics; use std::sync::Arc; -use thiserror::Error; pub mod labels; #[derive(Clone, Debug)] pub struct Metrics(metrics::Registry); -#[derive(Clone, Debug, Error)] -#[error("connection denied on unknown port {0}")] -pub struct DeniedUnknownPort(pub u16); - -#[derive(Debug, Error)] -#[error("unauthorized connection from {client_addr} with identity {tls:?} to {dst_addr}")] -pub struct DeniedUnauthorized { - pub client_addr: Remote, - pub dst_addr: OrigDstAddr, - pub tls: linkerd_tls::ConditionalServerTls, -} - -impl From> for Metrics { - fn from(reg: metrics::Registry) -> Self { - Self(reg) +impl Metrics { + pub fn new(retain_idle: std::time::Duration) -> (Self, metrics::Report) { + let (reg, report) = metrics::new(retain_idle); + (Self(reg), report) } } diff --git a/linkerd/app/gateway/src/gateway.rs b/linkerd/app/gateway/src/gateway.rs index bd26ae4b7b..ac0a1f841a 100644 --- a/linkerd/app/gateway/src/gateway.rs +++ b/linkerd/app/gateway/src/gateway.rs @@ -8,6 +8,7 @@ use linkerd_app_core::{ svc::{self, layer}, tls, Error, NameAddr, }; +use linkerd_app_inbound::{GatewayDomainInvalid, GatewayIdentityRequired, GatewayLoop}; use linkerd_app_outbound as outbound; use std::{ future::Future, @@ -135,6 +136,7 @@ where type Error = Error; type Future = ResponseFuture; + #[inline] fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { match self { Self::Outbound { outbound, .. } => outbound.poll_ready(cx).map_err(Into::into), @@ -160,7 +162,9 @@ where if let Some(by) = fwd_by(forwarded) { tracing::info!(%forwarded); if by == local_id.as_ref() { - return Box::pin(future::err(HttpError::gateway_loop().into())); + return Box::pin(future::err( + HttpError::loop_detected(GatewayLoop).into(), + )); } } } @@ -179,7 +183,7 @@ where None => { warn!("Request missing ClientId extension"); return Box::pin(future::err( - HttpError::identity_required("no identity").into(), + HttpError::forbidden(GatewayIdentityRequired).into(), )); } }; @@ -201,9 +205,11 @@ where Box::pin(outbound.call(request).map_err(Into::into)) } Self::NoIdentity => Box::pin(future::err( - HttpError::identity_required("no identity").into(), + HttpError::forbidden(GatewayIdentityRequired).into(), + )), + Self::BadDomain(..) => Box::pin(future::err( + HttpError::bad_request(GatewayDomainInvalid).into(), )), - Self::BadDomain(..) => Box::pin(future::err(HttpError::not_found("bad domain").into())), } } } diff --git a/linkerd/app/gateway/src/lib.rs b/linkerd/app/gateway/src/lib.rs index c6faed520b..60202b52d8 100644 --- a/linkerd/app/gateway/src/lib.rs +++ b/linkerd/app/gateway/src/lib.rs @@ -17,12 +17,13 @@ use linkerd_app_core::{ }, svc::{self, Param}, tls, + transport::{ClientAddr, OrigDstAddr, Remote}, transport_header::SessionProtocol, Error, Infallible, NameAddr, NameMatch, }; use linkerd_app_inbound::{ direct::{ClientInfo, GatewayConnection, GatewayTransportHeader, Legacy}, - Inbound, + policy, Inbound, }; use linkerd_app_outbound::{self as outbound, Outbound}; use std::{ @@ -41,6 +42,7 @@ pub struct Config { struct HttpLegacy { client: ClientInfo, version: http::Version, + policy: policy::AllowPolicy, } #[derive(Clone, Debug)] @@ -48,6 +50,7 @@ struct HttpTransportHeader { target: NameAddr, client: ClientInfo, version: http::Version, + policy: policy::AllowPolicy, } #[derive(Clone, Debug)] @@ -96,7 +99,7 @@ where dispatch_timeout, .. } = inbound.config().proxy.clone(); - let local_id = inbound.runtime().identity.as_ref().map(|l| l.id().clone()); + let local_id = inbound.identity().map(|l| l.id().clone()); // For each gatewayed connection that is *not* HTTP, use the target from the // transport header to lookup a service profile. If the profile includes a @@ -163,8 +166,7 @@ where svc::layers() .push( inbound - .runtime() - .metrics + .proxy_metrics() .stack .layer(metrics::StackLabels::inbound("tcp", "gateway")), ) @@ -199,8 +201,7 @@ where svc::layers() .push( inbound - .runtime() - .metrics + .proxy_metrics() .stack .layer(metrics::StackLabels::inbound("http", "gateway")), ) @@ -225,7 +226,8 @@ where .clone() .with_stack( http.clone() - .push(svc::NewRouter::layer(RouteHttp)) + .push(svc::NewRouter::layer(|(_, target)| RouteHttp(target))) + .push(inbound.authorize_http()) .push_http_insert_target::(), ) .push_http_server() @@ -239,10 +241,12 @@ where // When a transported connection is received, use the header's target to // drive routing. inbound + .clone() .with_stack( // A router is needed so that we use each request's HTTP version - // (i.e. after serverside orig-proto downgrading). - http.push(svc::NewRouter::layer(RouteHttp)) + // (i.e. after server-side orig-proto downgrading). + http.push(svc::NewRouter::layer(|(_, target)| RouteHttp(target))) + .push(inbound.authorize_http()) .push_http_insert_target::(), ) .push_http_server() @@ -250,23 +254,22 @@ where .push_on_service(svc::BoxService::layer()) .push(svc::BoxNewService::layer()) .push_switch( - |GatewayTransportHeader { - target, - protocol, - client, - .. - }| match protocol { + |gth: GatewayTransportHeader| match gth.protocol { Some(proto) => Ok(svc::Either::A(HttpTransportHeader { - target, - client, + target: gth.target, + client: gth.client, + policy: gth.policy, version: match proto { SessionProtocol::Http1 => http::Version::Http1, SessionProtocol::Http2 => http::Version::H2, }, })), - None => Ok::<_, Infallible>(svc::Either::B(target)), + None => Ok::<_, Infallible>(svc::Either::B(gth)), }, - tcp.push_on_service(svc::BoxService::layer()) + tcp.push_map_target(|(_permit, gth): (_, GatewayTransportHeader)| gth.target) + .push(inbound.authorize_tcp()) + .check_new_service::() + .push_on_service(svc::BoxService::layer()) .push(svc::BoxNewService::layer()) .into_inner(), ) @@ -308,6 +311,39 @@ impl Param for HttpTransportHeader { } } +impl Param for HttpTransportHeader { + fn param(&self) -> OrigDstAddr { + self.client.local_addr + } +} + +impl Param> for HttpTransportHeader { + fn param(&self) -> Remote { + self.client.client_addr + } +} + +impl Param for HttpTransportHeader { + fn param(&self) -> tls::ConditionalServerTls { + tls::ConditionalServerTls::Some(tls::ServerTls::Established { + client_id: Some(self.client.client_id.clone()), + negotiated_protocol: self.client.alpn.clone(), + }) + } +} + +impl Param for HttpTransportHeader { + fn param(&self) -> policy::AllowPolicy { + self.policy.clone() + } +} + +impl Param for HttpTransportHeader { + fn param(&self) -> policy::ServerLabel { + self.policy.server_label() + } +} + // === impl HttpLegacy === impl> TryFrom<(Result, E>, Legacy)> for HttpLegacy { @@ -320,6 +356,7 @@ impl> TryFrom<(Result, E>, Legacy)> for Htt Ok(Some(version)) => Ok(Self { version, client: gateway.client, + policy: gateway.policy, }), Ok(None) => Err(RefusedNoTarget(()).into()), Err(e) => Err(e.into()), @@ -327,12 +364,6 @@ impl> TryFrom<(Result, E>, Legacy)> for Htt } } -impl From<(http::Version, ClientInfo)> for HttpLegacy { - fn from((version, client): (http::Version, ClientInfo)) -> Self { - Self { version, client } - } -} - impl Param for HttpLegacy { fn param(&self) -> http::normalize_uri::DefaultAuthority { http::normalize_uri::DefaultAuthority(None) @@ -351,12 +382,45 @@ impl Param for HttpLegacy { } } +impl Param for HttpLegacy { + fn param(&self) -> OrigDstAddr { + self.client.local_addr + } +} + +impl Param> for HttpLegacy { + fn param(&self) -> Remote { + self.client.client_addr + } +} + impl Param for HttpLegacy { fn param(&self) -> tls::ClientId { self.client.client_id.clone() } } +impl Param for HttpLegacy { + fn param(&self) -> tls::ConditionalServerTls { + tls::ConditionalServerTls::Some(tls::ServerTls::Established { + client_id: Some(self.client.client_id.clone()), + negotiated_protocol: self.client.alpn.clone(), + }) + } +} + +impl Param for HttpLegacy { + fn param(&self) -> policy::AllowPolicy { + self.policy.clone() + } +} + +impl Param for HttpLegacy { + fn param(&self) -> policy::ServerLabel { + self.policy.server_label() + } +} + // === impl RouteHttp === impl svc::stack::RecognizeRoute> for RouteHttp { @@ -374,12 +438,8 @@ impl svc::stack::RecognizeRoute> for RouteHttp { fn recognize(&self, req: &http::Request) -> Result { let version = req.version().try_into()?; - - if let Some(a) = req.uri().authority() { - let target = NameAddr::from_authority_with_default_port(a, 80)?; - return Ok(HttpTarget { target, version }); - } - - Err(RefusedNoTarget(()).into()) + let authority = req.uri().authority().ok_or(RefusedNoTarget(()))?; + let target = NameAddr::from_authority_with_default_port(authority, 80)?; + Ok(HttpTarget { target, version }) } } diff --git a/linkerd/app/gateway/src/tests.rs b/linkerd/app/gateway/src/tests.rs index 234c0a2fa1..cc0f6ccf0e 100644 --- a/linkerd/app/gateway/src/tests.rs +++ b/linkerd/app/gateway/src/tests.rs @@ -3,8 +3,9 @@ use linkerd_app_core::{ dns, errors::HttpError, identity as id, profiles, proxy::http, svc::NewService, tls, Error, NameAddr, NameMatch, }; +use linkerd_app_inbound::{GatewayDomainInvalid, GatewayIdentityRequired, GatewayLoop}; use linkerd_app_test as support; -use std::str::FromStr; +use std::{error::Error as _, str::FromStr}; use tower::util::ServiceExt; use tower_test::mock; @@ -46,15 +47,13 @@ async fn bad_domain() { suffix: "bad.example.com", ..Default::default() }; - let status = test - .with_default_profile() - .run() - .await - .unwrap_err() - .downcast_ref::() + let e = test.with_default_profile().run().await.unwrap_err(); + assert!(e + .downcast::() .unwrap() - .status(); - assert_eq!(status, http::StatusCode::NOT_FOUND); + .source() + .unwrap() + .is::()); } #[tokio::test] @@ -63,15 +62,13 @@ async fn no_identity() { client_id: None, ..Default::default() }; - let status = test - .with_default_profile() - .run() - .await - .unwrap_err() - .downcast_ref::() + let e = test.with_default_profile().run().await.unwrap_err(); + assert!(e + .downcast::() + .unwrap() + .source() .unwrap() - .status(); - assert_eq!(status, http::StatusCode::FORBIDDEN); + .is::()); } #[tokio::test] @@ -82,15 +79,13 @@ async fn forward_loop() { ), ..Default::default() }; - let status = test - .with_default_profile() - .run() - .await - .unwrap_err() - .downcast_ref::() + let e = test.with_default_profile().run().await.unwrap_err(); + assert!(e + .downcast::() + .unwrap() + .source() .unwrap() - .status(); - assert_eq!(status, http::StatusCode::LOOP_DETECTED); + .is::()); } struct Test { diff --git a/linkerd/app/inbound/Cargo.toml b/linkerd/app/inbound/Cargo.toml index dcbcdab72d..19f64153e0 100644 --- a/linkerd/app/inbound/Cargo.toml +++ b/linkerd/app/inbound/Cargo.toml @@ -17,6 +17,7 @@ linkerd-app-core = { path = "../core" } linkerd-server-policy = { path = "../../server-policy" } linkerd-tonic-watch = { path = "../../tonic-watch" } linkerd2-proxy-api = { version = "0.2", features = ["client", "inbound"] } +parking_lot = "0.11" thiserror = "1.0" tokio = { version = "1", features = ["sync"] } tonic = { version = "0.5", default-features = false } diff --git a/linkerd/app/inbound/src/accept.rs b/linkerd/app/inbound/src/accept.rs index 615ec59e41..48d221a6df 100644 --- a/linkerd/app/inbound/src/accept.rs +++ b/linkerd/app/inbound/src/accept.rs @@ -66,7 +66,9 @@ impl Inbound { }, direct, ) - .push(rt.metrics.tcp_accept_errors.layer()) + .check_new_service::() + .push(rt.metrics.tcp_errors.to_layer()) + .check_new_service::() .instrument(|t: &T| { let OrigDstAddr(addr) = t.param(); info_span!("server", port = addr.port()) @@ -126,9 +128,9 @@ mod tests { authorizations: vec![Authorization { authentication: Authentication::Unauthenticated, networks: vec![Default::default()], - labels: Default::default(), + name: "testsaz".to_string(), }], - labels: Default::default(), + name: "testsrv".to_string(), }, None, ); diff --git a/linkerd/app/inbound/src/detect.rs b/linkerd/app/inbound/src/detect.rs index e37a8603e5..0cf3ad0eff 100644 --- a/linkerd/app/inbound/src/detect.rs +++ b/linkerd/app/inbound/src/detect.rs @@ -1,5 +1,5 @@ use crate::{ - policy::{AllowPolicy, Permit}, + policy::{self, AllowPolicy, Permit, Protocol, ServerLabel}, Inbound, }; use linkerd_app_core::{ @@ -9,11 +9,10 @@ use linkerd_app_core::{ transport::{ self, addrs::{ClientAddr, OrigDstAddr, Remote}, - DeniedUnauthorized, ServerAddr, + ServerAddr, }, Error, Infallible, }; -use linkerd_server_policy::Protocol; use std::{fmt::Debug, time}; #[derive(Clone, Debug, PartialEq, Eq)] @@ -74,8 +73,7 @@ impl Inbound { NSvc::Error: Into, NSvc::Future: Send, F: svc::NewService + Clone + Send + Sync + Unpin + 'static, - FSvc: svc::Service + Send + 'static, - FSvc::Error: Into, + FSvc: svc::Service + Send + 'static, FSvc::Future: Send, { self.push_detect_http(forward.clone()) @@ -98,61 +96,73 @@ impl Inbound { NSvc::Error: Into, NSvc::Future: Send, F: svc::NewService + Clone + Send + Sync + Unpin + 'static, - FSvc: svc::Service + Send + 'static, - FSvc::Error: Into, + FSvc: svc::Service + Send + 'static, FSvc::Future: Send, { - const TLS_PORT_SKIPPED: tls::ConditionalServerTls = - tls::ConditionalServerTls::None(tls::NoServerTls::PortSkipped); - self.map_stack(|cfg, rt, detect| { + let forward = svc::stack(forward) + .push_map_target(Forward::from) + .push(policy::NewAuthorizeTcp::layer(rt.metrics.tcp_authz.clone())); + let detect_timeout = cfg.proxy.detect_protocol_timeout; detect + .check_new_service::() .push_switch( // Ensure that the connection is authorized before proceeding with protocol // detection. - |(tls, t): (tls::ConditionalServerTls, T)| -> Result<_, Error> { + |(status, t): (tls::ConditionalServerTls, T)| -> Result<_, Infallible> { let policy: AllowPolicy = t.param(); + let protocol = policy.protocol(); + let tls = Tls { + client_addr: t.param(), + orig_dst_addr: t.param(), + status, + policy, + }; // If the port is configured to support application TLS, it may have also // been wrapped in mesh identity. In any case, we don't actually validate // whether app TLS was employed, but we use this as a signal that we should // not perform additional protocol detection. - if policy.protocol() == Protocol::Tls { - let permit = policy.check_authorized(t.param(), &tls)?; - return Ok(svc::Either::B(Forward::mk(&t, tls, permit))); + if protocol == Protocol::Tls { + return Ok(svc::Either::B(tls)); } - Ok(svc::Either::A(Tls { - client_addr: t.param(), - orig_dst_addr: t.param(), - status: tls, - policy, - })) + Ok(svc::Either::A(tls)) }, - svc::stack(forward.clone()) + forward + .clone() .push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new)) .into_inner(), ) + .check_new_service::<(tls::ConditionalServerTls, T), _>() .push(tls::NewDetectTls::layer(TlsParams { timeout: tls::server::Timeout(detect_timeout), identity: rt.identity.clone(), })) + .check_new_service::() .push_switch( // If this port's policy indicates that authentication is not required and // detection should be skipped, use the TCP stack directly. - |t: T| -> Result<_, DeniedUnauthorized> { + |t: T| -> Result<_, Infallible> { let policy: AllowPolicy = t.param(); if policy.protocol() == Protocol::Opaque { - let permit = policy.check_authorized(t.param(), &TLS_PORT_SKIPPED)?; - return Ok(svc::Either::B(Forward::mk(&t, TLS_PORT_SKIPPED, permit))); + const TLS_PORT_SKIPPED: tls::ConditionalServerTls = + tls::ConditionalServerTls::None(tls::NoServerTls::PortSkipped); + return Ok(svc::Either::B(Tls { + client_addr: t.param(), + orig_dst_addr: t.param(), + status: TLS_PORT_SKIPPED, + policy, + })); } Ok(svc::Either::A(t)) }, - svc::stack(forward) + forward .push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new)) .into_inner(), ) + .check_new_service::() .push_on_service(svc::BoxService::layer()) .push(svc::BoxNewService::layer()) }) @@ -171,8 +181,7 @@ impl Inbound { NSvc::Error: Into, NSvc::Future: Send, F: svc::NewService + Clone + Send + Sync + Unpin + 'static, - FSvc: svc::Service + Send + 'static, - FSvc::Error: Into, + FSvc: svc::Service + Send + 'static, FSvc::Future: Send, { self.map_stack(|cfg, rt, http| { @@ -182,37 +191,25 @@ impl Inbound { .clone() .push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new)) .push(transport::metrics::NewServer::layer( - rt.metrics.transport.clone(), + rt.metrics.proxy.transport.clone(), )) .check_new_service::>() .push_switch( - |(http, Detect { tls, .. })| -> Result<_, Error> { + |(http, Detect { tls, .. })| -> Result<_, Infallible> { match http { Some(http) => Ok(svc::Either::A(Http { http, tls })), // When HTTP detection fails, forward the connection to the application as // an opaque TCP stream. - None => { - let Tls { - client_addr, - orig_dst_addr, - status: tls, - policy, - } = tls; - let permit = policy.check_authorized(client_addr, &tls)?; - Ok(svc::Either::B(Forward { - client_addr, - orig_dst_addr, - tls, - permit, - })) - } + None => Ok(svc::Either::B(tls)), } }, svc::stack(forward) .push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new)) .push(transport::metrics::NewServer::layer( - rt.metrics.transport.clone(), + rt.metrics.proxy.transport.clone(), )) + .push_map_target(Forward::from) + .push(policy::NewAuthorizeTcp::layer(rt.metrics.tcp_authz.clone())) .into_inner(), ) .push(svc::BoxNewService::layer()) @@ -222,7 +219,7 @@ impl Inbound { http.push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new)) .push(transport::metrics::NewServer::layer( - rt.metrics.transport.clone(), + rt.metrics.proxy.transport.clone(), )) .check_new_service::() .push_switch( @@ -257,15 +254,12 @@ impl Inbound { // === impl Forward === -impl Forward { - fn mk(t: &T, tls: tls::ConditionalServerTls, permit: Permit) -> Self - where - T: svc::Param> + svc::Param, - { +impl From<(Permit, Tls)> for Forward { + fn from((permit, tls): (Permit, Tls)) -> Self { Self { - client_addr: t.param(), - orig_dst_addr: t.param(), - tls, + client_addr: tls.client_addr, + orig_dst_addr: tls.orig_dst_addr, + tls: tls.status, permit, } } @@ -282,12 +276,37 @@ impl svc::Param for Forward { transport::labels::Key::inbound_server( self.tls.clone(), self.orig_dst_addr.into(), - self.permit.server_labels.clone(), - self.permit.authz_labels.clone(), + self.permit.labels.server.clone(), ) } } +// === impl Tls === + +impl svc::Param for Tls { + fn param(&self) -> AllowPolicy { + self.policy.clone() + } +} + +impl svc::Param for Tls { + fn param(&self) -> OrigDstAddr { + self.orig_dst_addr + } +} + +impl svc::Param> for Tls { + fn param(&self) -> Remote { + self.client_addr + } +} + +impl svc::Param for Tls { + fn param(&self) -> tls::ConditionalServerTls { + self.status.clone() + } +} + // === impl ConfigureHttpDetect === impl svc::ExtractParam, Detect> for ConfigureHttpDetect { @@ -304,6 +323,12 @@ impl svc::Param for Http { } } +impl svc::Param for Http { + fn param(&self) -> OrigDstAddr { + self.tls.orig_dst_addr + } +} + impl svc::Param> for Http { fn param(&self) -> Remote { Remote(ServerAddr(self.tls.orig_dst_addr.into())) @@ -352,13 +377,18 @@ impl svc::Param for Http { } } +impl svc::Param for Http { + fn param(&self) -> ServerLabel { + self.tls.policy.server_label() + } +} + impl svc::Param for Http { fn param(&self) -> transport::labels::Key { transport::labels::Key::inbound_server( self.tls.status.clone(), self.tls.orig_dst_addr.into(), - self.tls.policy.server_labels(), - Default::default(), + self.tls.policy.server_label(), ) } } @@ -412,9 +442,9 @@ mod tests { authorizations: vec![Authorization { authentication: Authentication::Unauthenticated, networks: vec![client_addr().ip().into()], - labels: Default::default(), + name: "testsaz".to_string(), }], - labels: Default::default(), + name: "testsrv".to_string(), }, ); allow diff --git a/linkerd/app/inbound/src/direct.rs b/linkerd/app/inbound/src/direct.rs index 2ded34c881..a9fe4ade3d 100644 --- a/linkerd/app/inbound/src/direct.rs +++ b/linkerd/app/inbound/src/direct.rs @@ -37,24 +37,24 @@ pub(crate) struct Local { /// Gateway connections come in two variants: those with a transport header, and /// legacy connections, without a transport header. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] pub enum GatewayConnection { TransportHeader(GatewayTransportHeader), Legacy(Legacy), } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] pub struct GatewayTransportHeader { pub target: NameAddr, pub protocol: Option, pub client: ClientInfo, - permit: policy::Permit, + pub policy: policy::AllowPolicy, } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] pub struct Legacy { pub client: ClientInfo, - permit: policy::Permit, + pub policy: policy::AllowPolicy, } /// Client connections *must* have an identity. @@ -113,7 +113,7 @@ impl Inbound { inner .push(transport::metrics::NewServer::layer( - rt.metrics.transport.clone(), + rt.metrics.proxy.transport.clone(), )) .instrument(|_: &_| debug_span!("opaque")) .check_new_service::() @@ -156,19 +156,12 @@ impl Inbound { // connection is a gateway connection. We check the _gateway // address's_ policy to determine whether the client is // authorized to use this gateway. - let allow = policies.check_policy(client.local_addr)?; - let tls = tls::ConditionalServerTls::Some( - tls::ServerTls::Established { - client_id: Some(client.client_id.clone()), - negotiated_protocol: client.alpn.clone(), - }, - ); - let permit = allow.check_authorized(client.client_addr, &tls)?; + let policy = policies.check_policy(client.local_addr)?; Ok(svc::Either::B(GatewayTransportHeader { target: NameAddr::from((name, port)), protocol, client, - permit, + policy, })) } TransportHeader { @@ -185,7 +178,7 @@ impl Inbound { .push_on_service(svc::MapTargetLayer::new(io::EitherIo::Left)) .push_map_target(GatewayConnection::TransportHeader) .push(transport::metrics::NewServer::layer( - rt.metrics.transport.clone(), + rt.metrics.proxy.transport.clone(), )) .instrument( |g: &GatewayTransportHeader| info_span!("gateway", dst = %g.target), @@ -207,23 +200,17 @@ impl Inbound { // be receiving a gateway connection from an older client. We check the // gateway address's policy to determine whether the client is // authorized to use this gateway. - let allow = policies.check_policy(client.local_addr)?; - let tls = - tls::ConditionalServerTls::Some(tls::ServerTls::Established { - client_id: Some(client.client_id.clone()), - negotiated_protocol: client.alpn.clone(), - }); - let permit = allow.check_authorized(client.client_addr, &tls)?; - Ok(svc::Either::B(Legacy { client, permit })) + let policy = policies.check_policy(client.local_addr)?; + Ok(svc::Either::B(Legacy { client, policy })) } }, - // TODO: Remove this after we have at least one stable release out - // with transport header support. + // TODO(ver): Remove this after we have another stable release out with + // transport header support. svc::stack(gateway) .push_map_target(GatewayConnection::Legacy) .push_on_service(svc::MapTargetLayer::new(io::EitherIo::Right)) .push(transport::metrics::NewServer::layer( - rt.metrics.transport.clone(), + rt.metrics.proxy.transport.clone(), )) .instrument(|_: &Legacy| info_span!("gateway", legacy = true)) .check_new_service::>() @@ -295,8 +282,7 @@ impl Param for Local { negotiated_protocol: None, }), ([127, 0, 0, 1], self.port).into(), - self.permit.server_labels.clone(), - self.permit.authz_labels.clone(), + self.permit.labels.server.clone(), ) } } @@ -306,17 +292,40 @@ impl Param for Local { impl Param for GatewayTransportHeader { fn param(&self) -> transport::labels::Key { transport::labels::Key::inbound_server( - tls::ConditionalServerTls::Some(tls::ServerTls::Established { - client_id: Some(self.client.client_id.clone()), - negotiated_protocol: self.client.alpn.clone(), - }), + self.param(), self.client.local_addr.into(), - self.permit.server_labels.clone(), - self.permit.authz_labels.clone(), + self.policy.server_label(), ) } } +impl Param for GatewayTransportHeader { + fn param(&self) -> policy::AllowPolicy { + self.policy.clone() + } +} + +impl Param for GatewayTransportHeader { + fn param(&self) -> OrigDstAddr { + self.client.local_addr + } +} + +impl Param> for GatewayTransportHeader { + fn param(&self) -> Remote { + self.client.client_addr + } +} + +impl Param for GatewayTransportHeader { + fn param(&self) -> tls::ConditionalServerTls { + tls::ConditionalServerTls::Some(tls::ServerTls::Established { + client_id: Some(self.client.client_id.clone()), + negotiated_protocol: self.client.alpn.clone(), + }) + } +} + // === impl Legacy === impl Param for Legacy { @@ -327,8 +336,7 @@ impl Param for Legacy { negotiated_protocol: self.client.alpn.clone(), }), self.client.local_addr.into(), - self.permit.server_labels.clone(), - self.permit.authz_labels.clone(), + self.policy.server_label(), ) } } diff --git a/linkerd/app/inbound/src/http/mod.rs b/linkerd/app/inbound/src/http/mod.rs index 089682194d..3162063909 100644 --- a/linkerd/app/inbound/src/http/mod.rs +++ b/linkerd/app/inbound/src/http/mod.rs @@ -216,15 +216,21 @@ pub mod fuzz { authorizations: vec![policy::Authorization { authentication: policy::Authentication::Unauthenticated, networks: vec![std::net::IpAddr::from([192, 0, 2, 3]).into()], - labels: None.into_iter().collect(), + name: "testsaz".to_string(), }], - labels: Default::default(), + name: "testsrv".to_string(), }, ); policy } } + impl svc::Param for Target { + fn param(&self) -> policy::ServerLabel { + policy::ServerLabel("testsrv".to_string()) + } + } + impl svc::Param for Target { fn param(&self) -> http::normalize_uri::DefaultAuthority { http::normalize_uri::DefaultAuthority(None) diff --git a/linkerd/app/inbound/src/http/router.rs b/linkerd/app/inbound/src/http/router.rs index 814b9d11bb..7c3bef8d93 100644 --- a/linkerd/app/inbound/src/http/router.rs +++ b/linkerd/app/inbound/src/http/router.rs @@ -25,7 +25,7 @@ struct LogicalPerRequest { client: Remote, server: Remote, tls: tls::ConditionalServerTls, - policy: policy::AllowPolicy, + permit: policy::Permit, } /// Describes a logical request target. @@ -85,7 +85,7 @@ impl Inbound { // Creates HTTP clients for each inbound port & HTTP settings. let http = connect .push(svc::stack::BoxFuture::layer()) - .push(transport::metrics::Client::layer(rt.metrics.transport.clone())) + .push(transport::metrics::Client::layer(rt.metrics.proxy.transport.clone())) .push(http::client::layer( config.proxy.connect.h1_settings, config.proxy.connect.h2_settings, @@ -100,6 +100,7 @@ impl Inbound { // Records metrics for each `Logical`. .push( rt.metrics + .proxy .http_endpoint .to_layer::(), ) @@ -123,7 +124,7 @@ impl Inbound { .push_on_service(http::BoxRequest::layer()) // Records per-route metrics. .push( - rt.metrics + rt.metrics.proxy .http_route .to_layer::(), ) @@ -197,7 +198,7 @@ impl Inbound { .check_new_service::>() .push_on_service( svc::layers() - .push(rt.metrics.stack.layer(stack_labels("http", "logical"))) + .push(rt.metrics.proxy.stack.layer(stack_labels("http", "logical"))) .push(svc::FailFast::layer( "HTTP Logical", config.proxy.dispatch_timeout, @@ -221,12 +222,13 @@ impl Inbound { // dispatches the request. NewRouter moves the NewService into the service type, so // minimize it's type footprint with a Box. .push(svc::BoxNewService::layer()) - .push(svc::NewRouter::layer(|t: T| LogicalPerRequest { + .push(svc::NewRouter::layer(|(permit, t): (_, T)| LogicalPerRequest { client: t.param(), server: t.param(), tls: t.param(), - policy: t.param(), + permit, })) + .push(policy::NewAuthorizeHttp::layer(rt.metrics.http_authz.clone())) // Used by tap. .push_http_insert_target::() .push_http_insert_target::>() @@ -262,20 +264,11 @@ impl svc::stack::RecognizeRoute> for LogicalPerRequest { .or_else(|| http_request_authority_addr(req).ok()?.into_name_addr()) .or_else(|| http_request_host_addr(req).ok()?.into_name_addr()); - // Use the per-port inbound policy to determine whether the request is permitted. - let permit = match self.policy.check_authorized(self.client, &self.tls) { - Ok(permit) => permit, - Err(denied) => { - tracing::debug!(?logical, ?denied); - return Err(denied.into()); - } - }; - Ok(Logical { logical, addr: self.server, tls: self.tls.clone(), - permit, + permit: self.permit.clone(), // Use the request's HTTP version (i.e. as modified by orig-proto downgrading). http: req .version() @@ -319,10 +312,7 @@ impl Param for Logical { tls: self.tls.clone(), authority: self.logical.as_ref().map(|d| d.as_http_authority()), target_addr: self.addr.into(), - policy: metrics::PolicyLabels { - server: self.permit.server_labels.clone(), - authz: self.permit.authz_labels.clone(), - }, + policy: self.permit.labels.clone(), } .into() } @@ -355,6 +345,7 @@ impl tap::Inspect for Logical { } fn dst_labels(&self, _: &http::Request) -> Option<&tap::Labels> { + // TODO include policy labels here. None } diff --git a/linkerd/app/inbound/src/http/server.rs b/linkerd/app/inbound/src/http/server.rs index a236b04a74..2d769dfed9 100644 --- a/linkerd/app/inbound/src/http/server.rs +++ b/linkerd/app/inbound/src/http/server.rs @@ -7,8 +7,10 @@ pub use linkerd_app_core::proxy::http::{ use linkerd_app_core::{ config::{ProxyConfig, ServerConfig}, errors, http_tracing, identity, io, + metrics::ServerLabel, proxy::http, svc::{self, Param}, + transport::OrigDstAddr, Error, }; use tracing::debug_span; @@ -18,7 +20,9 @@ impl Inbound { where T: Param + Param - + Param>, + + Param> + + Param + + Param, T: Clone + Send + 'static, I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + Send + Unpin + 'static, H: svc::NewService + Clone + Send + Sync + Unpin + 'static, @@ -55,10 +59,13 @@ impl Inbound { // driven outside of the request path, so there's no need // for SpawnReady .push(svc::ConcurrencyLimitLayer::new(max_in_flight_requests)) - .push(svc::FailFast::layer("HTTP Server", dispatch_timeout)) - .push(rt.metrics.http_errors.clone()) + .push(svc::FailFast::layer("HTTP Server", dispatch_timeout)), + ) + .push(rt.metrics.http_errors.to_layer()) + .push_on_service( + svc::layers() // Synthesizes responses for proxy errors. - .push(errors::layer()) + .push(errors::respond::layer()) .push(http_tracing::server( rt.span_sink.clone(), super::trace_labels(), diff --git a/linkerd/app/inbound/src/http/tests.rs b/linkerd/app/inbound/src/http/tests.rs index 65747c2df4..249e849bd5 100644 --- a/linkerd/app/inbound/src/http/tests.rs +++ b/linkerd/app/inbound/src/http/tests.rs @@ -8,7 +8,7 @@ use crate::{ }; use hyper::{client::conn::Builder as ClientBuilder, Body, Request, Response}; use linkerd_app_core::{ - errors::L5D_PROXY_ERROR, + errors::respond::L5D_PROXY_ERROR, identity, io, proxy::http, svc::{self, NewService, Param}, @@ -419,15 +419,21 @@ impl svc::Param for Target { authorizations: vec![policy::Authorization { authentication: policy::Authentication::Unauthenticated, networks: vec![std::net::IpAddr::from([192, 0, 2, 3]).into()], - labels: None.into_iter().collect(), + name: "testsaz".to_string(), }], - labels: Default::default(), + name: "testsrv".to_string(), }, ); policy } } +impl svc::Param for Target { + fn param(&self) -> policy::ServerLabel { + policy::ServerLabel("testsrv".to_string()) + } +} + impl svc::Param for Target { fn param(&self) -> http::normalize_uri::DefaultAuthority { http::normalize_uri::DefaultAuthority(None) diff --git a/linkerd/app/inbound/src/lib.rs b/linkerd/app/inbound/src/lib.rs index b5fd3976b8..aadd71f569 100644 --- a/linkerd/app/inbound/src/lib.rs +++ b/linkerd/app/inbound/src/lib.rs @@ -10,21 +10,26 @@ mod accept; mod detect; pub mod direct; mod http; +mod metrics; pub mod policy; mod server; #[cfg(any(test, fuzzing))] pub(crate) mod test_util; -pub use self::policy::DefaultPolicy; +pub use self::{metrics::Metrics, policy::DefaultPolicy}; use linkerd_app_core::{ config::{ConnectConfig, ProxyConfig}, - drain, io, metrics, + drain, + http_tracing::OpenCensusSink, + io, proxy::tcp, + proxy::{identity::LocalCrtKey, tap}, svc, transport::{self, Remote, ServerAddr}, Error, NameMatch, ProxyRuntime, }; use std::{fmt::Debug, time::Duration}; +use thiserror::Error; use tracing::debug_span; #[cfg(fuzzing)] @@ -41,10 +46,34 @@ pub struct Config { #[derive(Clone)] pub struct Inbound { config: Config, - runtime: ProxyRuntime, + runtime: Runtime, stack: svc::Stack, } +#[derive(Clone)] +struct Runtime { + metrics: Metrics, + identity: Option, + tap: tap::Registry, + span_sink: OpenCensusSink, + drain: drain::Watch, +} + +// The inbound HTTP server handles gateway traffic; so gateway error types are defined here (so that +// error metrics can be recorded properly). + +#[derive(Debug, Error)] +#[error("no identity provided")] +pub struct GatewayIdentityRequired; + +#[derive(Debug, Error)] +#[error("bad gateway domain")] +pub struct GatewayDomainInvalid; + +#[derive(Debug, Error)] +#[error("gateway loop detected")] +pub struct GatewayLoop; + // === impl Inbound === impl Inbound { @@ -52,8 +81,26 @@ impl Inbound { &self.config } - pub fn runtime(&self) -> &ProxyRuntime { - &self.runtime + pub fn identity(&self) -> Option<&LocalCrtKey> { + self.runtime.identity.as_ref() + } + + pub fn proxy_metrics(&self) -> &metrics::Proxy { + &self.runtime.metrics.proxy + } + + /// A helper for gateways to instrument policy checks. + pub fn authorize_http( + &self, + ) -> impl svc::layer::Layer> + Clone { + policy::NewAuthorizeHttp::layer(self.runtime.metrics.http_authz.clone()) + } + + /// A helper for gateways to instrument policy checks. + pub fn authorize_tcp( + &self, + ) -> impl svc::layer::Layer> + Clone { + policy::NewAuthorizeTcp::layer(self.runtime.metrics.tcp_authz.clone()) } pub fn into_stack(self) -> svc::Stack { @@ -67,7 +114,7 @@ impl Inbound { /// Creates a new `Inbound` by replacing the inner stack, as modified by `f`. fn map_stack( self, - f: impl FnOnce(&Config, &ProxyRuntime, svc::Stack) -> svc::Stack, + f: impl FnOnce(&Config, &Runtime, svc::Stack) -> svc::Stack, ) -> Inbound { let stack = f(&self.config, &self.runtime, self.stack); Inbound { @@ -80,6 +127,13 @@ impl Inbound { impl Inbound<()> { pub fn new(config: Config, runtime: ProxyRuntime) -> Self { + let runtime = Runtime { + metrics: Metrics::new(runtime.metrics), + identity: runtime.identity, + tap: runtime.tap, + span_sink: runtime.span_sink, + drain: runtime.drain, + }; Self { config, runtime, @@ -87,6 +141,10 @@ impl Inbound<()> { } } + pub fn metrics(&self) -> Metrics { + self.runtime.metrics.clone() + } + pub fn with_stack(self, stack: S) -> Inbound { self.map_stack(move |_, _, _| svc::stack(stack)) } @@ -163,7 +221,7 @@ impl Inbound { self.map_stack(|_, rt, connect| { connect .push(transport::metrics::Client::layer( - rt.metrics.transport.clone(), + rt.metrics.proxy.transport.clone(), )) .push_make_thunk() .push_on_service( diff --git a/linkerd/app/inbound/src/metrics/authz.rs b/linkerd/app/inbound/src/metrics/authz.rs new file mode 100644 index 0000000000..74cf2a3890 --- /dev/null +++ b/linkerd/app/inbound/src/metrics/authz.rs @@ -0,0 +1,153 @@ +use crate::policy::{AllowPolicy, Permit}; +use linkerd_app_core::{ + metrics::{metrics, AuthzLabels, Counter, FmtMetrics, ServerLabel}, + transport::labels::TargetAddr, +}; +use parking_lot::Mutex; +use std::{collections::HashMap, sync::Arc}; + +metrics! { + inbound_http_authz_allow_total: Counter { + "The total number of inbound HTTP requests that were authorized" + }, + inbound_http_authz_deny_total: Counter { + "The total number of inbound HTTP requests that could not be processed due to a proxy error." + }, + + inbound_tcp_authz_allow_total: Counter { + "The total number of inbound TCP connections that were authorized" + }, + inbound_tcp_authz_deny_total: Counter { + "The total number of inbound TCP connections that were denied" + }, + inbound_tcp_authz_terminate_total: Counter { + "The total number of inbound TCP connections that were terminated due to an authorization change" + } +} + +#[derive(Clone, Debug, Default)] +pub(crate) struct HttpAuthzMetrics(Arc); + +#[derive(Clone, Debug, Default)] +pub(crate) struct TcpAuthzMetrics(Arc); + +#[derive(Debug, Default)] +struct HttpInner { + allow: Mutex>, + deny: Mutex>, +} + +#[derive(Debug, Default)] +struct TcpInner { + allow: Mutex>, + deny: Mutex>, + terminate: Mutex>, +} + +fn server_labels(policy: &AllowPolicy) -> (TargetAddr, ServerLabel) { + (TargetAddr(policy.dst_addr().into()), policy.server_label()) +} + +fn authz_labels(permit: &Permit) -> (TargetAddr, AuthzLabels) { + (TargetAddr(permit.dst.into()), permit.labels.clone()) +} + +// === impl HttpAuthzMetrics === + +impl HttpAuthzMetrics { + pub fn allow(&self, permit: &Permit) { + self.0 + .allow + .lock() + .entry(authz_labels(permit)) + .or_default() + .incr(); + } + + pub fn deny(&self, policy: &AllowPolicy) { + self.0 + .deny + .lock() + .entry(server_labels(policy)) + .or_default() + .incr(); + } +} + +impl FmtMetrics for HttpAuthzMetrics { + fn fmt_metrics(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let allow = self.0.allow.lock(); + if !allow.is_empty() { + inbound_http_authz_allow_total.fmt_help(f)?; + inbound_http_authz_allow_total.fmt_scopes(f, allow.iter(), |c| c)?; + } + drop(allow); + + let deny = self.0.deny.lock(); + if !deny.is_empty() { + inbound_http_authz_deny_total.fmt_help(f)?; + inbound_http_authz_deny_total.fmt_scopes(f, deny.iter(), |c| c)?; + } + drop(deny); + + Ok(()) + } +} + +// === impl TcpAuthzMetrics === + +impl TcpAuthzMetrics { + pub fn allow(&self, permit: &Permit) { + self.0 + .allow + .lock() + .entry(authz_labels(permit)) + .or_default() + .incr(); + } + + pub fn deny(&self, policy: &AllowPolicy) { + self.0 + .deny + .lock() + .entry(server_labels(policy)) + .or_default() + .incr(); + } + + pub fn terminate(&self, policy: &AllowPolicy) { + self.0 + .terminate + .lock() + .entry(server_labels(policy)) + .or_default() + .incr(); + } +} + +impl FmtMetrics for TcpAuthzMetrics { + fn fmt_metrics(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let allow = self.0.allow.lock(); + if !allow.is_empty() { + inbound_tcp_authz_allow_total.fmt_help(f)?; + inbound_tcp_authz_allow_total.fmt_scopes(f, allow.iter(), |c| c)?; + } + drop(allow); + + let deny = self.0.deny.lock(); + if !deny.is_empty() { + inbound_tcp_authz_deny_total.fmt_help(f)?; + inbound_tcp_authz_deny_total.fmt_scopes(f, deny.iter(), |c| c)?; + } + drop(deny); + + let terminate = self.0.terminate.lock(); + if !terminate.is_empty() { + inbound_tcp_authz_terminate_total.fmt_help(f)?; + inbound_tcp_authz_terminate_total.fmt_scopes(f, terminate.iter(), |c| c)?; + } + drop(terminate); + + Ok(()) + } +} diff --git a/linkerd/app/inbound/src/metrics/error/http.rs b/linkerd/app/inbound/src/metrics/error/http.rs new file mode 100644 index 0000000000..81c83b5326 --- /dev/null +++ b/linkerd/app/inbound/src/metrics/error/http.rs @@ -0,0 +1,84 @@ +use super::ErrorKind; +use linkerd_app_core::{ + metrics::{metrics, Counter, FmtMetrics, ServerLabel}, + svc::{self, stack::NewMonitor}, + transport::{labels::TargetAddr, OrigDstAddr}, + Error, +}; +use parking_lot::Mutex; +use std::{collections::HashMap, sync::Arc}; + +metrics! { + inbound_http_errors_total: Counter { + "The total number of inbound HTTP requests that could not be processed due to a proxy error." + } +} + +#[derive(Clone, Debug, Default)] +pub struct HttpErrorMetrics(Arc>>); + +#[derive(Clone, Debug)] +pub struct MonitorHttpErrorMetrics { + labels: (TargetAddr, ServerLabel), + registry: HttpErrorMetrics, +} + +// === impl HttpErrorMetrics === + +impl HttpErrorMetrics { + pub fn to_layer(&self) -> impl svc::layer::Layer> + Clone { + NewMonitor::layer(self.clone()) + } +} + +impl svc::stack::MonitorNewService for HttpErrorMetrics +where + T: svc::Param + svc::Param, +{ + type MonitorService = MonitorHttpErrorMetrics; + + #[inline] + fn monitor(&mut self, target: &T) -> Self::MonitorService { + let OrigDstAddr(addr) = target.param(); + MonitorHttpErrorMetrics { + labels: (TargetAddr(addr), target.param()), + registry: self.clone(), + } + } +} + +impl FmtMetrics for HttpErrorMetrics { + fn fmt_metrics(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let metrics = self.0.lock(); + if metrics.is_empty() { + return Ok(()); + } + inbound_http_errors_total.fmt_help(f)?; + inbound_http_errors_total.fmt_scopes(f, metrics.iter(), |c| c) + } +} + +// === impl MonitorHttpErrorMetrics === + +impl svc::stack::MonitorService for MonitorHttpErrorMetrics { + type MonitorResponse = Self; + + #[inline] + fn monitor_request(&mut self, _: &Req) -> Self::MonitorResponse { + self.clone() + } +} + +impl svc::stack::MonitorError for MonitorHttpErrorMetrics { + #[inline] + fn monitor_error(&mut self, e: &Error) { + if let Some(error) = ErrorKind::mk(&**e) { + self.registry + .0 + .lock() + .entry((error, self.labels.clone())) + .or_default() + .incr(); + } + } +} diff --git a/linkerd/app/inbound/src/metrics/error/mod.rs b/linkerd/app/inbound/src/metrics/error/mod.rs new file mode 100644 index 0000000000..4fb3a37c3f --- /dev/null +++ b/linkerd/app/inbound/src/metrics/error/mod.rs @@ -0,0 +1,71 @@ +mod http; +mod tcp; + +pub(crate) use self::{http::HttpErrorMetrics, tcp::TcpErrorMetrics}; +use crate::{ + policy::{DeniedUnauthorized, DeniedUnknownPort}, + GatewayDomainInvalid, GatewayIdentityRequired, GatewayLoop, +}; +use linkerd_app_core::{errors::FailFastError, metrics::FmtLabels, tls}; +use std::fmt; + +/// Inbound proxy error types. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +enum ErrorKind { + DeniedUnknown, + FailFast, + GatewayDomainInvalid, + GatewayIdentityRequired, + GatewayLoop, + Io, + TlsDetectTimeout, + Unexpected, +} + +// === impl ErrorKind === + +impl ErrorKind { + fn mk(err: &(dyn std::error::Error + 'static)) -> Option { + if err.is::() { + // Unauthorized metrics are tracked separately.and are not considered to be errors. + None + } else if err.is::() { + Some(ErrorKind::DeniedUnknown) + } else if err.is::() { + Some(ErrorKind::FailFast) + } else if err.is::() { + Some(ErrorKind::Io) + } else if err.is::() { + Some(ErrorKind::TlsDetectTimeout) + } else if err.is::() { + Some(ErrorKind::GatewayDomainInvalid) + } else if err.is::() { + Some(ErrorKind::GatewayIdentityRequired) + } else if err.is::() { + Some(ErrorKind::GatewayLoop) + } else if let Some(e) = err.source() { + Self::mk(e) + } else { + Some(ErrorKind::Unexpected) + } + } +} + +impl FmtLabels for ErrorKind { + fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "error=\"{}\"", + match self { + ErrorKind::DeniedUnknown => "unknown port denied", + ErrorKind::FailFast => "failfast", + ErrorKind::TlsDetectTimeout => "tls detection timeout", + ErrorKind::GatewayIdentityRequired => "gateway identity required", + ErrorKind::GatewayLoop => "gateway loop", + ErrorKind::GatewayDomainInvalid => "gateway domain invalid", + ErrorKind::Io => "i/o", + ErrorKind::Unexpected => "unexpected", + } + ) + } +} diff --git a/linkerd/app/inbound/src/metrics/error/tcp.rs b/linkerd/app/inbound/src/metrics/error/tcp.rs new file mode 100644 index 0000000000..c86d540506 --- /dev/null +++ b/linkerd/app/inbound/src/metrics/error/tcp.rs @@ -0,0 +1,83 @@ +use super::ErrorKind; +use linkerd_app_core::{ + metrics::{metrics, Counter, FmtMetrics}, + svc::{self, stack::NewMonitor}, + transport::{labels::TargetAddr, OrigDstAddr}, + Error, +}; +use parking_lot::Mutex; +use std::{collections::HashMap, sync::Arc}; + +metrics! { + inbound_tcp_errors_total: Counter { + "The total number of inbound TCP connections that could not be processed due to a proxy error." + } +} + +#[derive(Clone, Debug, Default)] +pub struct TcpErrorMetrics(Arc>>); + +#[derive(Clone, Debug)] +pub struct MonitorTcpErrorMetrics { + target_addr: TargetAddr, + registry: TcpErrorMetrics, +} + +// === impl TcpErrorMetrics === + +impl TcpErrorMetrics { + pub fn to_layer(&self) -> impl svc::layer::Layer> + Clone { + NewMonitor::layer(self.clone()) + } +} + +impl svc::stack::MonitorNewService for TcpErrorMetrics +where + T: svc::Param, +{ + type MonitorService = MonitorTcpErrorMetrics; + + fn monitor(&mut self, target: &T) -> Self::MonitorService { + let OrigDstAddr(addr) = target.param(); + MonitorTcpErrorMetrics { + target_addr: TargetAddr(addr), + registry: self.clone(), + } + } +} + +impl FmtMetrics for TcpErrorMetrics { + fn fmt_metrics(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let metrics = self.0.lock(); + if metrics.is_empty() { + return Ok(()); + } + inbound_tcp_errors_total.fmt_help(f)?; + inbound_tcp_errors_total.fmt_scopes(f, metrics.iter(), |c| c) + } +} + +// === impl MonitorTcpErrorMetrics === + +impl svc::stack::MonitorService for MonitorTcpErrorMetrics { + type MonitorResponse = Self; + + #[inline] + fn monitor_request(&mut self, _: &Req) -> Self::MonitorResponse { + self.clone() + } +} + +impl svc::stack::MonitorError for MonitorTcpErrorMetrics { + #[inline] + fn monitor_error(&mut self, e: &Error) { + if let Some(error) = ErrorKind::mk(&**e) { + self.registry + .0 + .lock() + .entry((error, self.target_addr)) + .or_default() + .incr(); + } + } +} diff --git a/linkerd/app/inbound/src/metrics/mod.rs b/linkerd/app/inbound/src/metrics/mod.rs new file mode 100644 index 0000000000..78473fd326 --- /dev/null +++ b/linkerd/app/inbound/src/metrics/mod.rs @@ -0,0 +1,54 @@ +//! Inbound proxy metrics. +//! +//! While this module is very similar to `outbound::metrics`, it is bound to `inbound_`-prefixed +//! metrics and derives its labels from inbound-specific types. Eventually, we won't rely on the +//! legacy `proxy` metrics and all inbound metrics will be defined in this module. +//! +//! TODO(ver) We use a `Mutex` to store our error metrics because we don't expect these registries +//! to be updated frequently or in a performance-critical area. We should probably look to use +//! `DashMap` as we migrate other metrics registries. + +pub(crate) mod authz; +pub(crate) mod error; + +pub use linkerd_app_core::metrics::*; + +/// Holds outbound proxy metrics. +#[derive(Clone, Debug)] +pub struct Metrics { + pub(crate) http_authz: authz::HttpAuthzMetrics, + pub http_errors: error::HttpErrorMetrics, + + pub(crate) tcp_authz: authz::TcpAuthzMetrics, + pub tcp_errors: error::TcpErrorMetrics, + + /// Holds metrics that are common to both inbound and outbound proxies. These metrics are + /// reported separately + pub proxy: Proxy, +} + +impl Metrics { + pub(crate) fn new(proxy: Proxy) -> Self { + Self { + http_authz: authz::HttpAuthzMetrics::default(), + http_errors: error::HttpErrorMetrics::default(), + tcp_authz: authz::TcpAuthzMetrics::default(), + tcp_errors: error::TcpErrorMetrics::default(), + proxy, + } + } +} + +impl FmtMetrics for Metrics { + fn fmt_metrics(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.http_authz.fmt_metrics(f)?; + self.http_errors.fmt_metrics(f)?; + + self.tcp_authz.fmt_metrics(f)?; + self.tcp_errors.fmt_metrics(f)?; + + // XXX: Proxy metrics are reported elsewhere. + + Ok(()) + } +} diff --git a/linkerd/app/inbound/src/policy/authorize/http.rs b/linkerd/app/inbound/src/policy/authorize/http.rs new file mode 100644 index 0000000000..f8a85d950c --- /dev/null +++ b/linkerd/app/inbound/src/policy/authorize/http.rs @@ -0,0 +1,108 @@ +use crate::metrics::authz::HttpAuthzMetrics; + +use super::super::{AllowPolicy, Permit}; +use futures::{future, TryFutureExt}; +use linkerd_app_core::{ + errors::HttpError, + svc::{self, ServiceExt}, + tls, + transport::{ClientAddr, Remote}, + Error, +}; +use std::task; + +/// A middleware that enforces policy on each HTTP request. +/// +/// This enforcement is done lazily on each request so that policy updates are honored as the +/// connection progresses. +/// +/// The inner service is created for each request, so it's expected that this is combined with +/// caching. +#[derive(Clone, Debug)] +pub struct NewAuthorizeHttp { + metrics: HttpAuthzMetrics, + inner: N, +} + +#[derive(Clone, Debug)] +pub struct AuthorizeHttp { + target: T, + client_addr: Remote, + tls: tls::ConditionalServerTls, + policy: AllowPolicy, + metrics: HttpAuthzMetrics, + inner: N, +} + +// === impl NewAuthorizeHttp === + +impl NewAuthorizeHttp { + pub(crate) fn layer( + metrics: HttpAuthzMetrics, + ) -> impl svc::layer::Layer + Clone { + svc::layer::mk(move |inner| Self { + metrics: metrics.clone(), + inner, + }) + } +} + +impl svc::NewService for NewAuthorizeHttp +where + T: svc::Param + + svc::Param> + + svc::Param, + N: Clone, +{ + type Service = AuthorizeHttp; + + fn new_service(&mut self, target: T) -> Self::Service { + let client_addr = target.param(); + let tls = target.param(); + let policy = target.param(); + AuthorizeHttp { + target, + client_addr, + tls, + policy, + metrics: self.metrics.clone(), + inner: self.inner.clone(), + } + } +} + +// === impl AuthorizeHttp === + +impl svc::Service for AuthorizeHttp +where + T: Clone, + N: svc::NewService<(Permit, T), Service = S>, + S: svc::Service, + S::Error: Into, +{ + type Response = S::Response; + type Error = Error; + type Future = future::Either< + future::ErrInto, Error>, + future::Ready>, + >; + + #[inline] + fn poll_ready(&mut self, _: &mut task::Context<'_>) -> task::Poll> { + task::Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Req) -> Self::Future { + match self.policy.check_authorized(self.client_addr, &self.tls) { + Ok(permit) => { + self.metrics.allow(&permit); + let svc = self.inner.new_service((permit, self.target.clone())); + future::Either::Left(svc.oneshot(req).err_into::()) + } + Err(e) => { + self.metrics.deny(&self.policy); + future::Either::Right(future::err(HttpError::forbidden(e).into())) + } + } + } +} diff --git a/linkerd/app/inbound/src/policy/authorize/mod.rs b/linkerd/app/inbound/src/policy/authorize/mod.rs new file mode 100644 index 0000000000..9dcf2185a4 --- /dev/null +++ b/linkerd/app/inbound/src/policy/authorize/mod.rs @@ -0,0 +1,4 @@ +mod http; +mod tcp; + +pub use self::{http::NewAuthorizeHttp, tcp::NewAuthorizeTcp}; diff --git a/linkerd/app/inbound/src/policy/authorize/tcp.rs b/linkerd/app/inbound/src/policy/authorize/tcp.rs new file mode 100644 index 0000000000..e14b7f29a4 --- /dev/null +++ b/linkerd/app/inbound/src/policy/authorize/tcp.rs @@ -0,0 +1,163 @@ +use super::super::{AllowPolicy, DeniedUnauthorized, Permit}; +use crate::metrics::authz::TcpAuthzMetrics; +use futures::future; +use linkerd_app_core::{ + svc, tls, + transport::{ClientAddr, Remote}, + Error, Result, +}; +use std::{future::Future, pin::Pin, task}; + +/// A middleware that enforces policy on each TCP connection. When connection is authorized, we +/// continue to monitor the policy for changes and, if the connection is no longer authorized, it is +/// dropped/closed. +/// +/// Metrics are reported to the `TcpAuthzMetrics` struct. +#[derive(Clone, Debug)] +pub struct NewAuthorizeTcp { + inner: N, + metrics: TcpAuthzMetrics, +} + +#[derive(Clone, Debug)] +pub enum AuthorizeTcp { + Authorized(Authorized), + Unauthorized(Unauthorized), +} + +#[derive(Clone, Debug)] +pub struct Authorized { + inner: S, + policy: AllowPolicy, + client: Remote, + tls: tls::ConditionalServerTls, + metrics: TcpAuthzMetrics, +} + +#[derive(Clone, Debug)] +pub struct Unauthorized { + deny: DeniedUnauthorized, +} + +// === impl NewAuthorizeTcp === + +impl NewAuthorizeTcp { + pub(crate) fn layer( + metrics: TcpAuthzMetrics, + ) -> impl svc::layer::Layer + Clone { + svc::layer::mk(move |inner| Self { + inner, + metrics: metrics.clone(), + }) + } +} + +impl svc::NewService for NewAuthorizeTcp +where + T: svc::Param + + svc::Param> + + svc::Param, + N: svc::NewService<(Permit, T)>, +{ + type Service = AuthorizeTcp; + + fn new_service(&mut self, target: T) -> Self::Service { + let client = target.param(); + let tls = target.param(); + let policy: AllowPolicy = target.param(); + match policy.check_authorized(client, &tls) { + Ok(permit) => { + tracing::debug!(?permit, "Connection authorized"); + + // This new services requires a ClientAddr, so it must necessarily be built for each + // connection. So we can just increment the counter here since the service can only + // be used at most once. + self.metrics.allow(&permit); + + let inner = self.inner.new_service((permit, target)); + AuthorizeTcp::Authorized(Authorized { + inner, + policy, + client, + tls, + metrics: self.metrics.clone(), + }) + } + Err(deny) => { + tracing::info!(?deny, "Connection denied"); + self.metrics.deny(&policy); + AuthorizeTcp::Unauthorized(Unauthorized { deny }) + } + } + } +} + +// === impl AuthorizeTcp === + +impl svc::Service for AuthorizeTcp +where + S: svc::Service, + S::Error: Into, + S::Future: Send + 'static, +{ + type Response = (); + type Error = Error; + type Future = future::Either< + Pin> + Send + 'static>>, + future::Ready>, + >; + + #[inline] + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> task::Poll> { + match self { + Self::Authorized(Authorized { ref mut inner, .. }) => { + inner.poll_ready(cx).map_err(Into::into) + } + + // If connections are not authorized, fail it immediately. + Self::Unauthorized(Unauthorized { deny }) => { + task::Poll::Ready(Err(deny.clone().into())) + } + } + } + + fn call(&mut self, io: I) -> Self::Future { + match self { + // If the connection is authorized, pass it to the inner service and stop processing the + // connection if the authorization's state changes to no longer permit the request. + Self::Authorized(Authorized { + inner, + client, + tls, + policy, + metrics, + }) => { + let client = *client; + let tls = tls.clone(); + let mut policy = policy.clone(); + let metrics = metrics.clone(); + + // FIXME increment counter. + + let call = inner.call(io); + future::Either::Left(Box::pin(async move { + tokio::pin!(call); + loop { + tokio::select! { + res = &mut call => return res.map_err(Into::into), + _ = policy.changed() => { + if let Err(denied) = policy.check_authorized(client, &tls) { + metrics.terminate(&policy); + tracing::info!(%denied, "Connection terminated"); + return Err(denied.into()); + } + } + }; + } + })) + } + + Self::Unauthorized(_deny) => unreachable!("poll_ready must be called"), + } + } +} diff --git a/linkerd/app/inbound/src/policy/defaults.rs b/linkerd/app/inbound/src/policy/defaults.rs index 871ac063a6..a2cc6fa25d 100644 --- a/linkerd/app/inbound/src/policy/defaults.rs +++ b/linkerd/app/inbound/src/policy/defaults.rs @@ -1,7 +1,5 @@ use linkerd_app_core::{IpNet, Ipv4Net, Ipv6Net}; -use linkerd_server_policy::{ - Authentication, Authorization, Labels, Protocol, ServerPolicy, Suffix, -}; +use linkerd_server_policy::{Authentication, Authorization, Protocol, ServerPolicy, Suffix}; use std::time::Duration; pub fn all_authenticated(timeout: Duration) -> ServerPolicy { @@ -72,17 +70,13 @@ fn mk( authentication: Authentication, timeout: Duration, ) -> ServerPolicy { - let labels = Some(("name".to_string(), name.to_string())) - .into_iter() - .collect::(); - ServerPolicy { protocol: Protocol::Detect { timeout }, authorizations: vec![Authorization { networks: nets.into_iter().map(Into::into).collect(), authentication, - labels: labels.clone(), + name: name.to_string(), }], - labels, + name: name.to_string(), } } diff --git a/linkerd/app/inbound/src/policy/discover.rs b/linkerd/app/inbound/src/policy/discover.rs index 953acf6293..dc32abea23 100644 --- a/linkerd/app/inbound/src/policy/discover.rs +++ b/linkerd/app/inbound/src/policy/discover.rs @@ -161,19 +161,30 @@ fn to_policy(proto: api::Server) -> Result { authn => return Err(format!("no authentication provided: {:?}", authn).into()), }; + let name = labels + .get("name") + .ok_or("authorization missing 'name' label")? + .clone(); + Ok(Authorization { networks, authentication: authn, - labels: labels.into_iter().collect(), + name, }) }, ) .collect::>>()?; + let name = proto + .labels + .get("name") + .ok_or("server missing 'name' label")? + .clone(); + Ok(ServerPolicy { protocol, authorizations, - labels: proto.labels.into_iter().collect(), + name, }) } diff --git a/linkerd/app/inbound/src/policy/mod.rs b/linkerd/app/inbound/src/policy/mod.rs index 7ce02b9910..10e856aa37 100644 --- a/linkerd/app/inbound/src/policy/mod.rs +++ b/linkerd/app/inbound/src/policy/mod.rs @@ -1,3 +1,4 @@ +mod authorize; mod config; pub mod defaults; mod discover; @@ -5,19 +6,33 @@ mod store; #[cfg(test)] mod tests; +pub use self::authorize::{NewAuthorizeHttp, NewAuthorizeTcp}; pub use self::config::Config; pub(crate) use self::store::Store; + +pub use linkerd_app_core::metrics::{AuthzLabels, ServerLabel}; use linkerd_app_core::{ tls, - transport::{ClientAddr, DeniedUnauthorized, DeniedUnknownPort, OrigDstAddr, Remote}, + transport::{ClientAddr, OrigDstAddr, Remote}, Result, }; -pub use linkerd_server_policy::{ - Authentication, Authorization, Labels, Protocol, ServerPolicy, Suffix, -}; +pub use linkerd_server_policy::{Authentication, Authorization, Protocol, ServerPolicy, Suffix}; +use thiserror::Error; use tokio::sync::watch; -pub(crate) trait CheckPolicy { +#[derive(Clone, Debug, Error)] +#[error("connection denied on unknown port {0}")] +pub struct DeniedUnknownPort(pub u16); + +#[derive(Clone, Debug, Error)] +#[error("unauthorized connection from {client_addr} with identity {tls:?} to {dst_addr}")] +pub struct DeniedUnauthorized { + pub client_addr: Remote, + pub dst_addr: OrigDstAddr, + pub tls: tls::ConditionalServerTls, +} + +pub trait CheckPolicy { /// Checks that the destination address is configured to allow traffic. fn check_policy(&self, dst: OrigDstAddr) -> Result; } @@ -29,17 +44,17 @@ pub enum DefaultPolicy { } #[derive(Clone, Debug)] -pub(crate) struct AllowPolicy { +pub struct AllowPolicy { dst: OrigDstAddr, server: watch::Receiver, } #[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub(crate) struct Permit { +pub struct Permit { + pub dst: OrigDstAddr, pub protocol: Protocol, - pub server_labels: Labels, - pub authz_labels: Labels, + pub labels: AuthzLabels, } // === impl DefaultPolicy === @@ -69,8 +84,20 @@ impl AllowPolicy { } #[inline] - pub(crate) fn server_labels(&self) -> Labels { - self.server.borrow().labels.clone() + pub fn dst_addr(&self) -> OrigDstAddr { + self.dst + } + + #[inline] + pub fn server_label(&self) -> ServerLabel { + ServerLabel(self.server.borrow().name.clone()) + } + + async fn changed(&mut self) { + if self.server.changed().await.is_err() { + // If the sender was dropped, then there can be no further changes. + futures::future::pending::<()>().await; + } } /// Checks whether the destination port's `AllowPolicy` is authorized to accept connections @@ -85,7 +112,7 @@ impl AllowPolicy { if authz.networks.iter().any(|n| n.contains(&client_addr.ip())) { match authz.authentication { Authentication::Unauthenticated => { - return Ok(Permit::new(&*server, authz)); + return Ok(Permit::new(self.dst, &*server, authz)); } Authentication::TlsUnauthenticated => { @@ -93,7 +120,7 @@ impl AllowPolicy { .. }) = tls { - return Ok(Permit::new(&*server, authz)); + return Ok(Permit::new(self.dst, &*server, authz)); } } @@ -109,7 +136,7 @@ impl AllowPolicy { if identities.contains(id.as_ref()) || suffixes.iter().any(|s| s.contains(id.as_ref())) { - return Ok(Permit::new(&*server, authz)); + return Ok(Permit::new(self.dst, &*server, authz)); } } } @@ -128,11 +155,14 @@ impl AllowPolicy { // === impl Permit === impl Permit { - fn new(server: &ServerPolicy, authz: &Authorization) -> Self { + fn new(dst: OrigDstAddr, server: &ServerPolicy, authz: &Authorization) -> Self { Self { + dst, protocol: server.protocol, - server_labels: server.labels.clone(), - authz_labels: authz.labels.clone(), + labels: AuthzLabels { + server: ServerLabel(server.name.clone()), + authz: authz.name.clone(), + }, } } } diff --git a/linkerd/app/inbound/src/policy/store.rs b/linkerd/app/inbound/src/policy/store.rs index 097f142aaa..ab1813f733 100644 --- a/linkerd/app/inbound/src/policy/store.rs +++ b/linkerd/app/inbound/src/policy/store.rs @@ -11,7 +11,7 @@ use std::{ use tokio::sync::watch; #[derive(Clone, Debug)] -pub(crate) struct Store { +pub struct Store { // When None, the default policy is 'deny'. default: Option, ports: Arc>, diff --git a/linkerd/app/inbound/src/policy/tests.rs b/linkerd/app/inbound/src/policy/tests.rs index 6e250f96a2..4137c6a6fa 100644 --- a/linkerd/app/inbound/src/policy/tests.rs +++ b/linkerd/app/inbound/src/policy/tests.rs @@ -9,13 +9,9 @@ fn unauthenticated_allowed() { authorizations: vec![Authorization { authentication: Authentication::Unauthenticated, networks: vec!["192.0.2.0/24".parse().unwrap()], - labels: vec![("name".to_string(), "unauth".to_string())] - .into_iter() - .collect(), + name: "unauth".to_string(), }], - labels: vec![("name".to_string(), "test".to_string())] - .into_iter() - .collect(), + name: "test".to_string(), }; let (policies, _tx) = Store::fixed(policy.clone(), None); @@ -31,13 +27,12 @@ fn unauthenticated_allowed() { assert_eq!( permitted, Permit { + dst: orig_dst_addr(), protocol: policy.protocol, - server_labels: vec![("name".to_string(), "test".to_string())] - .into_iter() - .collect(), - authz_labels: vec![("name".to_string(), "unauth".to_string()),] - .into_iter() - .collect() + labels: AuthzLabels { + server: ServerLabel("test".to_string()), + authz: "unauth".to_string(), + } } ); } @@ -52,13 +47,9 @@ fn authenticated_identity() { identities: vec![client_id().to_string()].into_iter().collect(), }, networks: vec!["192.0.2.0/24".parse().unwrap()], - labels: vec![("name".to_string(), "tls-auth".to_string())] - .into_iter() - .collect(), + name: "tls-auth".to_string(), }], - labels: vec![("name".to_string(), "test".to_string())] - .into_iter() - .collect(), + name: "test".to_string(), }; let (policies, _tx) = Store::fixed(policy.clone(), None); @@ -77,13 +68,12 @@ fn authenticated_identity() { assert_eq!( permitted, Permit { + dst: orig_dst_addr(), protocol: policy.protocol, - server_labels: vec![("name".to_string(), "test".to_string())] - .into_iter() - .collect(), - authz_labels: vec![("name".to_string(), "tls-auth".to_string()),] - .into_iter() - .collect() + labels: AuthzLabels { + server: ServerLabel("test".to_string()), + authz: "tls-auth".to_string(), + } } ); @@ -113,13 +103,9 @@ fn authenticated_suffix() { ])], }, networks: vec!["192.0.2.0/24".parse().unwrap()], - labels: vec![("name".to_string(), "tls-auth".to_string())] - .into_iter() - .collect(), + name: "tls-auth".to_string(), }], - labels: vec![("name".to_string(), "test".to_string())] - .into_iter() - .collect(), + name: "test".to_string(), }; let (policies, _tx) = Store::fixed(policy.clone(), None); @@ -137,13 +123,12 @@ fn authenticated_suffix() { .check_authorized(client_addr(), &tls) .expect("unauthenticated connection must be permitted"), Permit { + dst: orig_dst_addr(), protocol: policy.protocol, - server_labels: vec![("name".to_string(), "test".to_string())] - .into_iter() - .collect(), - authz_labels: vec![("name".to_string(), "tls-auth".to_string()),] - .into_iter() - .collect() + labels: AuthzLabels { + server: ServerLabel("test".to_string()), + authz: "tls-auth".to_string(), + } } ); @@ -167,13 +152,9 @@ fn tls_unauthenticated() { authorizations: vec![Authorization { authentication: Authentication::TlsUnauthenticated, networks: vec!["192.0.2.0/24".parse().unwrap()], - labels: vec![("name".to_string(), "tls-unauth".to_string())] - .into_iter() - .collect(), + name: "tls-unauth".to_string(), }], - labels: vec![("name".to_string(), "test".to_string())] - .into_iter() - .collect(), + name: "test".to_string(), }; let (policies, _tx) = Store::fixed(policy.clone(), None); @@ -191,13 +172,12 @@ fn tls_unauthenticated() { .check_authorized(client_addr(), &tls) .expect("unauthenticated connection must be permitted"), Permit { + dst: orig_dst_addr(), protocol: policy.protocol, - server_labels: vec![("name".to_string(), "test".to_string())] - .into_iter() - .collect(), - authz_labels: vec![("name".to_string(), "tls-unauth".to_string()),] - .into_iter() - .collect() + labels: AuthzLabels { + server: ServerLabel("test".to_string()), + authz: "tls-unauth".to_string(), + } } ); diff --git a/linkerd/app/inbound/src/server.rs b/linkerd/app/inbound/src/server.rs index 23e4c8237f..a27384953a 100644 --- a/linkerd/app/inbound/src/server.rs +++ b/linkerd/app/inbound/src/server.rs @@ -1,4 +1,4 @@ -use crate::{direct, Inbound}; +use crate::{direct, policy, Inbound}; use futures::Stream; use linkerd_app_core::{ dns, io, metrics, profiles, serve, svc, @@ -16,12 +16,24 @@ struct TcpEndpoint { // === impl Inbound === impl Inbound<()> { + pub async fn build_policies( + &self, + dns: dns::Resolver, + control_metrics: metrics::ControlHttp, + ) -> policy::Store { + self.config + .policy + .clone() + .build(dns, control_metrics, self.runtime.identity.clone()) + .await + .expect("Failed to fetch port policy") + } + pub async fn serve( self, addr: Local, listen: impl Stream> + Send + Sync + 'static, - dns: dns::Resolver, - control_metrics: metrics::ControlHttp, + policies: impl policy::CheckPolicy + Clone + Send + Sync + 'static, profiles: P, gateway: G, ) where @@ -49,14 +61,6 @@ impl Inbound<()> { .instrument(|_: &_| debug_span!("tcp")) .into_inner(); - let policies = self - .config - .policy - .clone() - .build(dns, control_metrics, self.runtime.identity.clone()) - .await - .expect("Failed to fetch port policy"); - // Handles connections that target the inbound proxy port. let direct = self .clone() diff --git a/linkerd/app/inbound/src/test_util.rs b/linkerd/app/inbound/src/test_util.rs index 67c902ceff..bfababe125 100644 --- a/linkerd/app/inbound/src/test_util.rs +++ b/linkerd/app/inbound/src/test_util.rs @@ -56,9 +56,9 @@ pub fn default_config() -> Config { authorizations: vec![Authorization { authentication: Authentication::Unauthenticated, networks: vec![Default::default()], - labels: Default::default(), + name: "testsaz".to_string(), }], - labels: Default::default(), + name: "testsrv".to_string(), } .into(), ports: Default::default(), @@ -68,12 +68,12 @@ pub fn default_config() -> Config { } pub fn runtime() -> (ProxyRuntime, drain::Signal) { - let (metrics, _) = metrics::Metrics::new(std::time::Duration::from_secs(10)); let (drain_tx, drain) = drain::channel(); let (tap, _) = tap::new(); + let (metrics, _) = metrics::Metrics::new(std::time::Duration::from_secs(10)); let runtime = ProxyRuntime { identity: None, - metrics: metrics.outbound, + metrics: metrics.proxy, tap, span_sink: None, drain, diff --git a/linkerd/app/integration/src/tests/telemetry/mod.rs b/linkerd/app/integration/src/tests/telemetry/mod.rs index 26fd8e151e..4d56993e73 100644 --- a/linkerd/app/integration/src/tests/telemetry/mod.rs +++ b/linkerd/app/integration/src/tests/telemetry/mod.rs @@ -3,7 +3,7 @@ // particular appears to do nothing... T_T #![allow(unused_imports)] -mod tcp_accept_errors; +mod tcp_errors; use crate::*; use std::io::Read; @@ -144,8 +144,7 @@ impl TcpFixture { .label("peer", "src") .label("tls", "disabled") .label("target_addr", orig_dst) - .label("srv_name", "default:all-unauthenticated") - .label("saz_name", "default:all-unauthenticated"); + .label("srv_name", "default:all-unauthenticated"); let dst_labels = metrics::labels() .label("direction", "inbound") diff --git a/linkerd/app/integration/src/tests/telemetry/tcp_accept_errors.rs b/linkerd/app/integration/src/tests/telemetry/tcp_errors.rs similarity index 95% rename from linkerd/app/integration/src/tests/telemetry/tcp_accept_errors.rs rename to linkerd/app/integration/src/tests/telemetry/tcp_errors.rs index deb598f652..ffbe88d173 100644 --- a/linkerd/app/integration/src/tests/telemetry/tcp_accept_errors.rs +++ b/linkerd/app/integration/src/tests/telemetry/tcp_errors.rs @@ -3,7 +3,7 @@ use std::time::SystemTime; const TIMEOUT: Duration = Duration::from_millis(640); // 640ms ought to be enough for anybody. -const METRIC: &str = "inbound_tcp_accept_errors_total"; +const METRIC: &str = "inbound_tcp_errors_total"; /// A helper that builds a proxy with the above detect timeout and a TCP server that always drops /// the accepted socket. @@ -78,7 +78,7 @@ async fn inbound_timeout() { .await; metric(&proxy) - .label("error", "tls_detect_timeout") + .label("error", "tls detection timeout") .value(1u64) .assert_in(&metrics) .await; @@ -98,7 +98,7 @@ async fn inbound_io_err() { drop(tcp_client); metric(&proxy) - .label("error", "io") + .label("error", "i/o") .value(1u64) .assert_in(&metrics) .await; @@ -124,7 +124,7 @@ async fn inbound_success() { let no_tls_client = client::tcp(proxy.inbound); let metric = metric(&proxy) - .label("error", "tls_detect_timeout") + .label("error", "tls detection timeout") .value(1u64); // Connect with TLS. The metric should not be incremented. @@ -155,8 +155,8 @@ async fn inbound_multi() { let client = client::tcp(proxy.inbound); let metric = metric(&proxy); - let timeout_metric = metric.clone().label("error", "tls_detect_timeout"); - let io_metric = metric.label("error", "io"); + let timeout_metric = metric.clone().label("error", "tls detection timeout"); + let io_metric = metric.label("error", "i/o"); let tcp_client = client.connect().await; @@ -206,8 +206,8 @@ async fn inbound_direct_multi() { let client = client::tcp(proxy.inbound); let metric = metrics::metric(METRIC).label("target_addr", proxy.inbound); - let timeout_metric = metric.clone().label("error", "tls_detect_timeout"); - let no_tls_metric = metric.clone().label("error", "other"); + let timeout_metric = metric.clone().label("error", "tls detection timeout"); + let no_tls_metric = metric.clone().label("error", "unexpected"); let tcp_client = client.connect().await; @@ -276,7 +276,7 @@ async fn inbound_direct_success() { let metric = metrics::metric(METRIC) .label("target_addr", proxy1.inbound) - .label("error", "tls_detect_timeout") + .label("error", "tls detection timeout") .value(1u64); // Connect with TLS. The metric should not be incremented. diff --git a/linkerd/app/outbound/Cargo.toml b/linkerd/app/outbound/Cargo.toml index 1f8ce64cd2..4d643e37d0 100644 --- a/linkerd/app/outbound/Cargo.toml +++ b/linkerd/app/outbound/Cargo.toml @@ -21,6 +21,7 @@ futures = { version = "0.3", default-features = false } linkerd-app-core = { path = "../core" } linkerd-http-retry = { path = "../../http-retry" } linkerd-identity = { path = "../../identity" } +parking_lot = "0.11" thiserror = "1.0" tokio = { version = "1", features = ["sync"] } tower = { version = "0.4.8", features = ["util"] } diff --git a/linkerd/app/outbound/src/discover.rs b/linkerd/app/outbound/src/discover.rs index ecda3f8cb5..3f9c526639 100644 --- a/linkerd/app/outbound/src/discover.rs +++ b/linkerd/app/outbound/src/discover.rs @@ -65,7 +65,12 @@ impl Outbound { // service in a background task so it becomes ready without // new requests. .push(svc::layer::mk(svc::SpawnReady::new)) - .push(rt.metrics.stack.layer(crate::stack_labels("tcp", "server"))) + .push( + rt.metrics + .proxy + .stack + .layer(crate::stack_labels("tcp", "server")), + ) .push(svc::FailFast::layer( "TCP Server", config.proxy.dispatch_timeout, @@ -73,12 +78,12 @@ impl Outbound { .push_spawn_buffer(config.proxy.buffer_capacity), ) .push(transport::metrics::NewServer::layer( - rt.metrics.transport.clone(), + rt.metrics.proxy.transport.clone(), )) .push_cache(config.proxy.cache_max_idle_age) .instrument(|a: &tcp::Accept| info_span!("server", orig_dst = %a.orig_dst)) .push_request_filter(|t: T| tcp::Accept::try_from(t.param())) - .push(rt.metrics.tcp_accept_errors.layer()) + .push(rt.metrics.tcp_errors.to_layer()) .push(svc::BoxNewService::layer()) .check_new_service::() }) diff --git a/linkerd/app/outbound/src/http/endpoint.rs b/linkerd/app/outbound/src/http/endpoint.rs index bd7ba4772a..254e70ed06 100644 --- a/linkerd/app/outbound/src/http/endpoint.rs +++ b/linkerd/app/outbound/src/http/endpoint.rs @@ -43,6 +43,7 @@ impl Outbound { .push(tap::NewTapHttp::layer(rt.tap.clone())) .push( rt.metrics + .proxy .http_endpoint .to_layer::(), ) diff --git a/linkerd/app/outbound/src/http/logical.rs b/linkerd/app/outbound/src/http/logical.rs index 17c5ac238a..7fdc530a71 100644 --- a/linkerd/app/outbound/src/http/logical.rs +++ b/linkerd/app/outbound/src/http/logical.rs @@ -64,6 +64,7 @@ impl Outbound { .push(http::BoxRequest::layer()) .push( rt.metrics + .proxy .stack .layer(stack_labels("http", "balance.endpoint")), ) @@ -84,7 +85,12 @@ impl Outbound { crate::EWMA_DEFAULT_RTT, crate::EWMA_DECAY, )) - .push(rt.metrics.stack.layer(stack_labels("http", "balancer"))) + .push( + rt.metrics + .proxy + .stack + .layer(stack_labels("http", "balancer")), + ) .push(svc::layer::mk(svc::SpawnReady::new)) .push(svc::FailFast::layer("HTTP Balancer", dispatch_timeout)) .push(http::BoxResponse::layer()), @@ -110,7 +116,12 @@ impl Outbound { .push_on_service( svc::layers() .push(svc::layer::mk(svc::SpawnReady::new)) - .push(rt.metrics.stack.layer(stack_labels("http", "logical"))) + .push( + rt.metrics + .proxy + .stack + .layer(stack_labels("http", "logical")), + ) .push(svc::FailFast::layer("HTTP Logical", dispatch_timeout)) .push_spawn_buffer(buffer_capacity), ) @@ -122,6 +133,7 @@ impl Outbound { .push_on_service(http::BoxRequest::layer()) .push( rt.metrics + .proxy .http_route_actual .to_layer::(), ) @@ -132,11 +144,16 @@ impl Outbound { // with both body types. .push_on_service(http::BoxRequest::erased()) // Sets an optional retry policy. - .push(retry::layer(rt.metrics.http_route_retry.clone())) + .push(retry::layer(rt.metrics.proxy.http_route_retry.clone())) // Sets an optional request timeout. .push(http::MakeTimeoutLayer::default()) // Records per-route metrics. - .push(rt.metrics.http_route.to_layer::()) + .push( + rt.metrics + .proxy + .http_route + .to_layer::(), + ) // Sets the per-route response classifier as a request // extension. .push(classify::NewClassify::layer()) diff --git a/linkerd/app/outbound/src/http/mod.rs b/linkerd/app/outbound/src/http/mod.rs index 37451c358a..8666431ebd 100644 --- a/linkerd/app/outbound/src/http/mod.rs +++ b/linkerd/app/outbound/src/http/mod.rs @@ -5,6 +5,7 @@ mod peer_proxy_errors; mod require_id_header; mod server; +pub(crate) use self::require_id_header::IdentityRequired; use crate::tcp; pub use linkerd_app_core::proxy::http::*; use linkerd_app_core::{ diff --git a/linkerd/app/outbound/src/http/peer_proxy_errors.rs b/linkerd/app/outbound/src/http/peer_proxy_errors.rs index 56155d4418..636f530f2a 100644 --- a/linkerd/app/outbound/src/http/peer_proxy_errors.rs +++ b/linkerd/app/outbound/src/http/peer_proxy_errors.rs @@ -1,5 +1,5 @@ use futures::prelude::*; -use linkerd_app_core::{errors::L5D_PROXY_ERROR, proxy::http::ClientHandle, svc}; +use linkerd_app_core::{errors::respond::L5D_PROXY_ERROR, proxy::http::ClientHandle, svc}; use std::{ future::Future, pin::Pin, diff --git a/linkerd/app/outbound/src/http/require_id_header.rs b/linkerd/app/outbound/src/http/require_id_header.rs index 39814f0cf8..869f1f35dc 100644 --- a/linkerd/app/outbound/src/http/require_id_header.rs +++ b/linkerd/app/outbound/src/http/require_id_header.rs @@ -1,6 +1,7 @@ use futures::{future, TryFutureExt}; -use linkerd_app_core::{errors::IdentityRequired, identity, svc, tls, Conditional, Error}; +use linkerd_app_core::{errors::HttpError, identity, svc, tls, Conditional, Error}; use std::task::{Context, Poll}; +use thiserror::Error; use tracing::{debug, trace}; const HEADER_NAME: &str = "l5d-require-id"; @@ -16,6 +17,13 @@ pub(super) struct RequireIdentity { inner: N, } +#[derive(Debug, Error)] +#[error("required id {required:?}; found {found:?}")] +pub(crate) struct IdentityRequired { + pub required: tls::client::ServerId, + pub found: Option, +} + // === impl NewRequireIdentity === impl NewRequireIdentity { @@ -84,10 +92,10 @@ where found = %server_id, "Identity required by header not satisfied" ); - let e = IdentityRequired { + let e = HttpError::forbidden(IdentityRequired { required: require_id.into(), found: Some(server_id.clone()), - }; + }); return future::Either::Left(future::err(e.into())); } else { trace!(required = %require_id, "Identity required by header"); @@ -95,10 +103,10 @@ where } Conditional::None(_) => { debug!(required = %require_id, "Identity required by header not satisfied"); - let e = IdentityRequired { + let e = HttpError::forbidden(IdentityRequired { required: require_id.into(), found: None, - }; + }); return future::Either::Left(future::err(e.into())); } } diff --git a/linkerd/app/outbound/src/http/server.rs b/linkerd/app/outbound/src/http/server.rs index ec369ad4a2..5d1ec75b26 100644 --- a/linkerd/app/outbound/src/http/server.rs +++ b/linkerd/app/outbound/src/http/server.rs @@ -45,11 +45,11 @@ impl Outbound { .push(svc::ConcurrencyLimitLayer::new(max_in_flight_requests)) .push(svc::FailFast::layer("HTTP Server", dispatch_timeout)) .push_spawn_buffer(buffer_capacity) - .push(rt.metrics.http_errors.clone()) + .push(rt.metrics.http_errors.to_layer()) // Tear down server connections when a peer proxy generates an error. .push(PeerProxyErrors::layer()) // Synthesizes responses for proxy errors. - .push(errors::layer()) + .push(errors::respond::layer()) // Initiates OpenCensus tracing. .push(http_tracing::server(rt.span_sink.clone(), trace_labels())) .push(http::BoxResponse::layer()), diff --git a/linkerd/app/outbound/src/ingress.rs b/linkerd/app/outbound/src/ingress.rs index d93084f65a..22e576bd0e 100644 --- a/linkerd/app/outbound/src/ingress.rs +++ b/linkerd/app/outbound/src/ingress.rs @@ -130,7 +130,12 @@ impl Outbound> { // aren't received. .push_on_service( svc::layers() - .push(rt.metrics.stack.layer(stack_labels("http", "logical"))) + .push( + rt.metrics + .proxy + .stack + .layer(stack_labels("http", "logical")), + ) .push(svc::layer::mk(svc::SpawnReady::new)) .push(svc::FailFast::layer("HTTP Logical", dispatch_timeout)) .push_spawn_buffer(buffer_capacity), @@ -204,8 +209,8 @@ impl Outbound> { // Otherwise, the inner service is always ready (because it's a router). .push(svc::ConcurrencyLimitLayer::new(max_in_flight_requests)) .push(svc::FailFast::layer("Ingress server", dispatch_timeout)) - .push(rt.metrics.http_errors.clone()) - .push(errors::layer()) + .push(rt.metrics.http_errors.to_layer()) + .push(errors::respond::layer()) .push(http_tracing::server(rt.span_sink, trace_labels())) .push(http::BoxResponse::layer()) .push(http::BoxRequest::layer()), @@ -221,14 +226,14 @@ impl Outbound> { .push(svc::BoxNewService::layer()) .push(detect::NewDetectService::layer(detect_http)) .push(transport::metrics::NewServer::layer( - rt.metrics.transport.clone(), + rt.metrics.proxy.transport.clone(), )) .instrument(|a: &tcp::Accept| info_span!("ingress", orig_dst = %a.orig_dst)) .push_map_target(|a: T| { let orig_dst = Param::::param(&a); tcp::Accept::from(orig_dst) }) - .push(rt.metrics.tcp_accept_errors.layer()) + .push(rt.metrics.tcp_errors.to_layer()) .push_on_service(svc::BoxService::layer()) .push(svc::BoxNewService::layer()) .check_new_service::() diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index 78f50a2526..320aef6b1c 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -10,19 +10,25 @@ pub mod endpoint; pub mod http; mod ingress; pub mod logical; +mod metrics; mod resolve; mod switch_logical; pub mod tcp; #[cfg(test)] pub(crate) mod test_util; +pub use self::metrics::Metrics; use futures::Stream; use linkerd_app_core::{ config::ProxyConfig, - io, metrics, profiles, + drain, + http_tracing::OpenCensusSink, + io, profiles, proxy::{ api_resolve::{ConcreteAddr, Metadata}, core::Resolve, + identity::LocalCrtKey, + tap, }, serve, svc::{self, stack::Param}, @@ -57,10 +63,19 @@ pub struct Config { #[derive(Clone, Debug)] pub struct Outbound { config: Config, - runtime: ProxyRuntime, + runtime: Runtime, stack: svc::Stack, } +#[derive(Clone, Debug)] +struct Runtime { + metrics: Metrics, + identity: Option, + tap: tap::Registry, + span_sink: OpenCensusSink, + drain: drain::Watch, +} + #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] pub struct Accept

{ pub orig_dst: OrigDstAddr, @@ -71,6 +86,13 @@ pub struct Accept

{ impl Outbound<()> { pub fn new(config: Config, runtime: ProxyRuntime) -> Self { + let runtime = Runtime { + metrics: Metrics::new(runtime.metrics), + identity: runtime.identity, + tap: runtime.tap, + span_sink: runtime.span_sink, + drain: runtime.drain, + }; Self { config, runtime, @@ -78,6 +100,10 @@ impl Outbound<()> { } } + pub fn metrics(&self) -> Metrics { + self.runtime.metrics.clone() + } + pub fn with_stack(self, stack: S) -> Outbound { self.map_stack(move |_, _, _| svc::stack(stack)) } @@ -88,10 +114,6 @@ impl Outbound { &self.config } - pub fn runtime(&self) -> &ProxyRuntime { - &self.runtime - } - pub fn into_stack(self) -> svc::Stack { self.stack } @@ -115,7 +137,7 @@ impl Outbound { /// Creates a new `Outbound` by replacing the inner stack, as modified by `f`. fn map_stack( self, - f: impl FnOnce(&Config, &ProxyRuntime, svc::Stack) -> svc::Stack, + f: impl FnOnce(&Config, &Runtime, svc::Stack) -> svc::Stack, ) -> Outbound { let stack = f(&self.config, &self.runtime, self.stack); Outbound { diff --git a/linkerd/app/outbound/src/metrics/error/http.rs b/linkerd/app/outbound/src/metrics/error/http.rs new file mode 100644 index 0000000000..5ea1194ec2 --- /dev/null +++ b/linkerd/app/outbound/src/metrics/error/http.rs @@ -0,0 +1,54 @@ +use super::ErrorKind; +use linkerd_app_core::{ + metrics::{metrics, Counter, FmtMetrics}, + svc, Error, +}; +use parking_lot::RwLock; +use std::{collections::HashMap, sync::Arc}; + +metrics! { + outbound_http_errors_total: Counter { + "The total number of inbound HTTP requests that could not be processed due to a proxy error." + } +} + +#[derive(Clone, Debug, Default)] +pub struct Http(Arc>>); + +// === impl Http === + +impl Http { + pub(crate) fn to_layer( + &self, + ) -> impl svc::layer::Layer> + Clone { + svc::stack::Monitor::layer(self.clone()) + } +} + +impl svc::stack::MonitorService for Http { + type MonitorResponse = Self; + + #[inline] + fn monitor_request(&mut self, _: &Req) -> Self::MonitorResponse { + self.clone() + } +} + +impl svc::stack::MonitorError for Http { + #[inline] + fn monitor_error(&mut self, e: &Error) { + let kind = ErrorKind::mk(&**e); + self.0.write().entry(kind).or_default().incr(); + } +} + +impl FmtMetrics for Http { + fn fmt_metrics(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let metrics = self.0.read(); + if metrics.is_empty() { + return Ok(()); + } + outbound_http_errors_total.fmt_help(f)?; + outbound_http_errors_total.fmt_scopes(f, metrics.iter(), |c| c) + } +} diff --git a/linkerd/app/outbound/src/metrics/error/mod.rs b/linkerd/app/outbound/src/metrics/error/mod.rs new file mode 100644 index 0000000000..d4841dc97f --- /dev/null +++ b/linkerd/app/outbound/src/metrics/error/mod.rs @@ -0,0 +1,56 @@ +mod http; +mod tcp; + +pub(crate) use self::{http::Http, tcp::Tcp}; +use crate::http::IdentityRequired; +use linkerd_app_core::{ + errors::{FailFastError, ResponseTimeout}, + metrics::FmtLabels, +}; +use std::fmt; + +/// Outbound proxy error types. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +enum ErrorKind { + FailFast, + IdentityRequired, + Io, + ResponseTimeout, + Unexpected, +} + +// === impl ErrorKind === + +impl ErrorKind { + fn mk(err: &(dyn std::error::Error + 'static)) -> Self { + if err.is::() { + ErrorKind::Io + } else if err.is::() { + ErrorKind::IdentityRequired + } else if err.is::() { + ErrorKind::FailFast + } else if err.is::() { + ErrorKind::ResponseTimeout + } else if let Some(e) = err.source() { + Self::mk(e) + } else { + ErrorKind::Unexpected + } + } +} + +impl FmtLabels for ErrorKind { + fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "error=\"{}\"", + match self { + ErrorKind::FailFast => "failfast", + ErrorKind::IdentityRequired => "identity required", + ErrorKind::Io => "i/o", + ErrorKind::ResponseTimeout => "response timeout", + ErrorKind::Unexpected => "unexpected", + } + ) + } +} diff --git a/linkerd/app/outbound/src/metrics/error/tcp.rs b/linkerd/app/outbound/src/metrics/error/tcp.rs new file mode 100644 index 0000000000..8cde7fd959 --- /dev/null +++ b/linkerd/app/outbound/src/metrics/error/tcp.rs @@ -0,0 +1,81 @@ +use super::ErrorKind; +use linkerd_app_core::{ + metrics::{metrics, Counter, FmtMetrics}, + svc, + transport::{labels::TargetAddr, OrigDstAddr}, + Error, +}; +use parking_lot::RwLock; +use std::{collections::HashMap, sync::Arc}; + +metrics! { + outbound_tcp_errors_total: Counter { + "The total number of outbound TCP connections that could not be processed due to a proxy error." + } +} + +#[derive(Clone, Debug, Default)] +pub(crate) struct Tcp(Arc>>); + +#[derive(Clone, Debug)] +pub(crate) struct MonitorTcp { + target_addr: TargetAddr, + registry: Tcp, +} + +// === impl Tcp === + +impl Tcp { + pub(crate) fn to_layer( + &self, + ) -> impl svc::layer::Layer> + Clone { + svc::stack::NewMonitor::layer(self.clone()) + } +} + +impl> svc::stack::MonitorNewService for Tcp { + type MonitorService = MonitorTcp; + + fn monitor(&mut self, target: &T) -> Self::MonitorService { + let OrigDstAddr(addr) = target.param(); + MonitorTcp { + target_addr: TargetAddr(addr), + registry: self.clone(), + } + } +} + +impl FmtMetrics for Tcp { + fn fmt_metrics(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let metrics = self.0.read(); + if metrics.is_empty() { + return Ok(()); + } + outbound_tcp_errors_total.fmt_help(f)?; + outbound_tcp_errors_total.fmt_scopes(f, metrics.iter(), |c| c) + } +} + +// === impl MonitorTcp === + +impl svc::stack::MonitorService for MonitorTcp { + type MonitorResponse = Self; + + #[inline] + fn monitor_request(&mut self, _: &Req) -> Self::MonitorResponse { + self.clone() + } +} + +impl svc::stack::MonitorError for MonitorTcp { + #[inline] + fn monitor_error(&mut self, e: &Error) { + let kind = ErrorKind::mk(&**e); + self.registry + .0 + .write() + .entry((self.target_addr, kind)) + .or_default() + .incr(); + } +} diff --git a/linkerd/app/outbound/src/metrics/mod.rs b/linkerd/app/outbound/src/metrics/mod.rs new file mode 100644 index 0000000000..d905314ef5 --- /dev/null +++ b/linkerd/app/outbound/src/metrics/mod.rs @@ -0,0 +1,45 @@ +//! Outbound proxy metrics. +//! +//! While this module is very similar to `inbound::metrics`, it is bound to `outbound_`-prefixed +//! metrics and derives its labels from outbound-specific types. Eventually, we won't rely on the +//! legacy `proxy` metrics and all outbound metrics will be defined in this module. +//! +//! TODO(ver) We use a `RwLock` to store our error metrics because we don't expect these registries +//! to be updated frequently or in a performance-critical area. We should probably look to use +//! `DashMap` as we migrate other metrics registries. + +pub(crate) mod error; + +pub use linkerd_app_core::metrics::*; + +/// Holds outbound proxy metrics. +#[derive(Clone, Debug)] +pub struct Metrics { + pub(crate) http_errors: error::Http, + pub(crate) tcp_errors: error::Tcp, + + /// Holds metrics that are common to both inbound and outbound proxies. These metrics are + /// reported separately + pub(crate) proxy: Proxy, +} + +impl Metrics { + pub(crate) fn new(proxy: Proxy) -> Self { + Self { + http_errors: error::Http::default(), + tcp_errors: error::Tcp::default(), + proxy, + } + } +} + +impl FmtMetrics for Metrics { + fn fmt_metrics(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.http_errors.fmt_metrics(f)?; + self.tcp_errors.fmt_metrics(f)?; + + // XXX: Proxy metrics are reported elsewhere. + + Ok(()) + } +} diff --git a/linkerd/app/outbound/src/tcp/connect.rs b/linkerd/app/outbound/src/tcp/connect.rs index 6f38258400..8c4f0326a5 100644 --- a/linkerd/app/outbound/src/tcp/connect.rs +++ b/linkerd/app/outbound/src/tcp/connect.rs @@ -66,10 +66,10 @@ impl Outbound { // ALPN negotiation indicates support. .push(OpaqueTransport::layer()) // Limits the time we wait for a connection to be established. - .push_timeout(config.proxy.connect.timeout) + .push_connect_timeout(config.proxy.connect.timeout) .push(svc::stack::BoxFuture::layer()) .push(transport::metrics::Client::layer( - rt.metrics.transport.clone(), + rt.metrics.proxy.transport.clone(), )) }) } diff --git a/linkerd/app/outbound/src/tcp/logical.rs b/linkerd/app/outbound/src/tcp/logical.rs index e5906f733a..8da0f56454 100644 --- a/linkerd/app/outbound/src/tcp/logical.rs +++ b/linkerd/app/outbound/src/tcp/logical.rs @@ -83,6 +83,7 @@ where )) .push( rt.metrics + .proxy .stack .layer(crate::stack_labels("tcp", "balancer")), ) @@ -98,6 +99,7 @@ where svc::layers() .push( rt.metrics + .proxy .stack .layer(crate::stack_labels("tcp", "logical")), ) diff --git a/linkerd/app/outbound/src/test_util.rs b/linkerd/app/outbound/src/test_util.rs index c1f863640b..0f2a3271a3 100644 --- a/linkerd/app/outbound/src/test_util.rs +++ b/linkerd/app/outbound/src/test_util.rs @@ -12,7 +12,7 @@ use linkerd_app_core::{ pub use linkerd_app_test as support; use std::{str::FromStr, time::Duration}; -pub fn default_config() -> Config { +pub(crate) fn default_config() -> Config { Config { ingress_mode: false, allow_discovery: IpMatch::new(Some(IpNet::from_str("0.0.0.0/0").unwrap())).into(), @@ -47,13 +47,13 @@ pub fn default_config() -> Config { } } -pub fn runtime() -> (ProxyRuntime, drain::Signal) { - let (metrics, _) = metrics::Metrics::new(std::time::Duration::from_secs(10)); +pub(crate) fn runtime() -> (ProxyRuntime, drain::Signal) { let (drain_tx, drain) = drain::channel(); let (tap, _) = tap::new(); + let (metrics, _) = metrics::Metrics::new(std::time::Duration::from_secs(10)); let runtime = ProxyRuntime { identity: None, - metrics: metrics.outbound, + metrics: metrics.proxy, tap, span_sink: None, drain, diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index 6d48abc4f2..b9734f7c70 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -17,6 +17,7 @@ use linkerd_app_core::{ config::ServerConfig, control::ControlAddr, dns, drain, + metrics::FmtMetrics, svc::Param, transport::{listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, Error, ProxyRuntime, @@ -96,8 +97,6 @@ impl Config { BAdmin: Bind + Clone + 'static, BAdmin::Addrs: Param> + Param>, { - use metrics::FmtMetrics; - let Config { admin, dns, @@ -142,10 +141,23 @@ impl Config { .in_scope(|| oc_collector.build(identity, dns, metrics, client_metrics)) }?; + let runtime = ProxyRuntime { + identity: identity.local(), + metrics: metrics.proxy.clone(), + tap: tap.registry(), + span_sink: oc_collector.span_sink(), + drain: drain_rx.clone(), + }; + let inbound = Inbound::new(inbound, runtime.clone()); + let outbound = Outbound::new(outbound, runtime); + let admin = { let identity = identity.local(); - let drain = drain_rx.clone(); - let metrics = metrics.inbound.clone(); + let metrics = inbound.metrics(); + let report = inbound + .metrics() + .and_then(outbound.metrics()) + .and_then(report); info_span!("admin").in_scope(move || { admin.build( bind_admin, @@ -153,36 +165,13 @@ impl Config { report, metrics, log_level, - drain, + drain_rx, shutdown_tx, ) })? }; let dst_addr = dst.addr.clone(); - - let inbound = Inbound::new( - inbound, - ProxyRuntime { - identity: identity.local(), - metrics: metrics.inbound, - tap: tap.registry(), - span_sink: oc_collector.span_sink(), - drain: drain_rx.clone(), - }, - ); - - let outbound = Outbound::new( - outbound, - ProxyRuntime { - identity: identity.local(), - metrics: metrics.outbound, - tap: tap.registry(), - span_sink: oc_collector.span_sink(), - drain: drain_rx, - }, - ); - let gateway_stack = gateway::stack( gateway, inbound.clone(), @@ -206,8 +195,8 @@ impl Config { let identity = identity.local(); let inbound_addr = inbound_addr; let profiles = dst.profiles; - let resolve = dst.resolve; let dns = dns.resolver; + let resolve = dst.resolve; let control_metrics = metrics.control; Box::pin(async move { @@ -221,13 +210,13 @@ impl Config { .instrument(info_span!("outbound")), ); + let inbound_policies = inbound.build_policies(dns, control_metrics).await; tokio::spawn( inbound .serve( inbound_addr, inbound_listen, - dns, - control_metrics, + inbound_policies, profiles, gateway_stack, ) diff --git a/linkerd/error-metrics/Cargo.toml b/linkerd/error-metrics/Cargo.toml deleted file mode 100644 index b0e41ab96b..0000000000 --- a/linkerd/error-metrics/Cargo.toml +++ /dev/null @@ -1,15 +0,0 @@ -[package] -name = "linkerd-error-metrics" -version = "0.1.0" -authors = ["Linkerd Developers "] -license = "Apache-2.0" -edition = "2018" -publish = false - - -[dependencies] -futures = { version = "0.3", default-features = false } -linkerd-metrics = { path = "../metrics" } -parking_lot = "0.11" -pin-project = "1" -tower = { version = "0.4.8", default-features = false } diff --git a/linkerd/error-metrics/src/layer.rs b/linkerd/error-metrics/src/layer.rs deleted file mode 100644 index b9f71c72b4..0000000000 --- a/linkerd/error-metrics/src/layer.rs +++ /dev/null @@ -1,33 +0,0 @@ -use crate::RecordError; -use linkerd_metrics::Counter; -use parking_lot::Mutex; -use std::{collections::HashMap, hash::Hash, sync::Arc}; - -#[derive(Debug)] -pub struct RecordErrorLayer { - label: L, - errors: Arc>>, -} - -impl RecordErrorLayer { - pub(crate) fn new(label: L, errors: Arc>>) -> Self { - Self { label, errors } - } -} - -impl tower::layer::Layer for RecordErrorLayer { - type Service = RecordError; - - fn layer(&self, inner: S) -> Self::Service { - RecordError::new(self.label.clone(), self.errors.clone(), inner) - } -} - -impl Clone for RecordErrorLayer { - fn clone(&self) -> Self { - Self { - errors: self.errors.clone(), - label: self.label.clone(), - } - } -} diff --git a/linkerd/error-metrics/src/lib.rs b/linkerd/error-metrics/src/lib.rs deleted file mode 100644 index 4d0b57ec89..0000000000 --- a/linkerd/error-metrics/src/lib.rs +++ /dev/null @@ -1,66 +0,0 @@ -#![deny(warnings, rust_2018_idioms)] -#![forbid(unsafe_code)] - -mod layer; -mod service; - -pub use self::layer::RecordErrorLayer; -pub use self::service::RecordError; -pub use linkerd_metrics::FmtLabels; -use linkerd_metrics::{self as metrics, Counter, FmtMetrics}; -use parking_lot::Mutex; -use std::{collections::HashMap, fmt, hash::Hash, sync::Arc}; - -pub trait LabelError { - type Labels: FmtLabels + Hash + Eq; - - fn label_error(&self, error: &E) -> Self::Labels; -} - -pub type Metric = metrics::Metric<'static, &'static str, Counter>; - -/// Produces layers and reports results. -#[derive(Debug)] -pub struct Registry -where - K: Hash + Eq, -{ - errors: Arc>>, - metric: Metric, -} - -impl Registry { - pub fn new(metric: Metric) -> Self { - Self { - errors: Default::default(), - metric, - } - } - - pub fn layer(&self, label: L) -> RecordErrorLayer { - RecordErrorLayer::new(label, self.errors.clone()) - } -} - -impl Clone for Registry { - fn clone(&self) -> Self { - Self { - errors: self.errors.clone(), - metric: self.metric, - } - } -} - -impl FmtMetrics for Registry { - fn fmt_metrics(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let errors = self.errors.lock(); - if errors.is_empty() { - return Ok(()); - } - - self.metric.fmt_help(f)?; - self.metric.fmt_scopes(f, errors.iter(), |c| c)?; - - Ok(()) - } -} diff --git a/linkerd/error-metrics/src/service.rs b/linkerd/error-metrics/src/service.rs deleted file mode 100644 index 68c0f6888d..0000000000 --- a/linkerd/error-metrics/src/service.rs +++ /dev/null @@ -1,119 +0,0 @@ -use crate::LabelError; -use futures::TryFuture; -use linkerd_metrics::{Counter, FmtLabels}; -use parking_lot::Mutex; -use pin_project::pin_project; -use std::{ - collections::HashMap, - future::Future, - hash::Hash, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; - -/// A middlware that records errors. -#[pin_project] -pub struct RecordError { - label: L, - errors: Errors, - #[pin] - inner: S, -} - -type Errors = Arc>>; - -impl RecordError { - pub(crate) fn new(label: L, errors: Errors, inner: S) -> Self { - RecordError { - label, - errors, - inner, - } - } -} - -impl From<(S, Errors)> for RecordError -where - K: Hash + Eq, - L: Default, -{ - fn from((inner, errors): (S, Errors)) -> Self { - RecordError { - label: L::default(), - errors, - inner, - } - } -} - -impl RecordError { - fn record(errors: &Errors, label: &L, err: &E) - where - L: LabelError + Clone, - { - let labels = label.label_error(err); - errors - .lock() - .entry(labels) - .or_insert_with(Default::default) - .incr(); - } -} - -impl tower::Service for RecordError -where - S: tower::Service, - L: LabelError + Clone, -{ - type Response = S::Response; - type Error = S::Error; - type Future = RecordError; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - match self.inner.poll_ready(cx) { - Poll::Ready(Err(err)) => { - Self::record(&self.errors, &self.label, &err); - Poll::Ready(Err(err)) - } - poll => poll, - } - } - - fn call(&mut self, req: Req) -> Self::Future { - RecordError { - inner: self.inner.call(req), - errors: self.errors.clone(), - label: self.label.clone(), - } - } -} - -impl Future for RecordError -where - F: TryFuture, - L: LabelError + Clone, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - match futures::ready!(this.inner.try_poll(cx)) { - Ok(ready) => Poll::Ready(Ok(ready)), - Err(err) => { - Self::record(&*this.errors, &*this.label, &err); - Poll::Ready(Err(err)) - } - } - } -} - -impl Clone for RecordError { - fn clone(&self) -> Self { - Self { - errors: self.errors.clone(), - label: self.label.clone(), - inner: self.inner.clone(), - } - } -} diff --git a/linkerd/server-policy/src/lib.rs b/linkerd/server-policy/src/lib.rs index de55e8e37a..5600e6eab7 100644 --- a/linkerd/server-policy/src/lib.rs +++ b/linkerd/server-policy/src/lib.rs @@ -1,25 +1,15 @@ mod network; pub use self::network::Network; -use std::{ - collections::{btree_map, BTreeMap, HashSet}, - hash::Hash, - iter::FromIterator, - sync::Arc, - time, -}; +use std::{collections::HashSet, hash::Hash, time}; #[derive(Clone, Debug, PartialEq, Eq)] pub struct ServerPolicy { pub protocol: Protocol, pub authorizations: Vec, - pub labels: Labels, + pub name: String, } -/// Stores an ordered, cloneable set of labels. -#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] -pub struct Labels(Arc>); - #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] pub enum Protocol { Detect { timeout: time::Duration }, @@ -34,7 +24,7 @@ pub enum Protocol { pub struct Authorization { pub networks: Vec, pub authentication: Authentication, - pub labels: Labels, + pub name: String, } #[derive(Clone, Debug, PartialEq, Eq)] @@ -52,26 +42,6 @@ pub struct Suffix { ends_with: String, } -// === impl Labels === - -impl Labels { - #[inline] - pub fn is_empty(&self) -> bool { - self.0.is_empty() - } - - #[inline] - pub fn iter(&self) -> btree_map::Iter<'_, String, String> { - self.0.iter() - } -} - -impl FromIterator<(String, String)> for Labels { - fn from_iter>(iter: T) -> Self { - Self(Arc::new(iter.into_iter().collect())) - } -} - // === impl Suffix === impl From> for Suffix { diff --git a/linkerd/stack/src/lib.rs b/linkerd/stack/src/lib.rs index f6d367c09e..143b2d873a 100644 --- a/linkerd/stack/src/lib.rs +++ b/linkerd/stack/src/lib.rs @@ -13,6 +13,7 @@ mod filter; pub mod layer; mod make_thunk; mod map_target; +pub mod monitor; pub mod new_service; mod on_service; mod proxy; @@ -31,6 +32,7 @@ pub use self::{ filter::{Filter, FilterLayer, Predicate}, make_thunk::MakeThunk, map_target::{MapTarget, MapTargetLayer, MapTargetService}, + monitor::{Monitor, MonitorError, MonitorNewService, MonitorService, NewMonitor}, new_service::NewService, on_service::{OnService, OnServiceLayer}, proxy::{Proxy, ProxyService}, diff --git a/linkerd/stack/src/monitor.rs b/linkerd/stack/src/monitor.rs new file mode 100644 index 0000000000..6627c1485c --- /dev/null +++ b/linkerd/stack/src/monitor.rs @@ -0,0 +1,137 @@ +use crate::{layer, NewService, Service}; +use futures::TryFuture; +use pin_project::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +/// A strategy for monitoring a NewService. +pub trait MonitorNewService { + type MonitorService; + + fn monitor(&mut self, target: &T) -> Self::MonitorService; +} + +/// A strategy for monitoring a Service. +pub trait MonitorService { + type MonitorResponse; + + /// Monitors a response. + fn monitor_request(&mut self, req: &Req) -> Self::MonitorResponse; +} + +/// A strategy for monitoring a Service's errors +pub trait MonitorError { + fn monitor_error(&mut self, err: &E); +} + +#[derive(Clone, Debug)] +pub struct NewMonitor { + monitor: M, + inner: N, +} + +#[derive(Clone, Debug)] +pub struct Monitor { + monitor: M, + inner: S, +} + +#[pin_project] +#[derive(Debug)] +pub struct MonitorFuture { + monitor: M, + #[pin] + inner: F, +} + +// === impl NewMonitor === + +impl NewMonitor { + pub fn layer(monitor: M) -> impl layer::Layer + Clone { + layer::mk(move |inner| Self { + inner, + monitor: monitor.clone(), + }) + } +} + +impl NewService for NewMonitor +where + M: MonitorNewService, + N: NewService, +{ + type Service = Monitor; + + #[inline] + fn new_service(&mut self, target: T) -> Self::Service { + let monitor = self.monitor.monitor(&target); + let inner = self.inner.new_service(target); + Monitor { monitor, inner } + } +} + +// === impl Monitor === + +impl Monitor { + pub fn layer(monitor: M) -> impl layer::Layer + Clone { + layer::mk(move |inner| Self { + inner, + monitor: monitor.clone(), + }) + } +} + +impl Service for Monitor +where + S: Service, + M: MonitorError, + M: MonitorService, + M::MonitorResponse: MonitorError, +{ + type Response = S::Response; + type Error = S::Error; + type Future = MonitorFuture; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + match futures::ready!(self.inner.poll_ready(cx)) { + Ok(()) => Poll::Ready(Ok(())), + Err(err) => { + self.monitor.monitor_error(&err); + Poll::Ready(Err(err)) + } + } + } + + #[inline] + fn call(&mut self, req: Req) -> Self::Future { + let monitor = self.monitor.monitor_request(&req); + let inner = self.inner.call(req); + MonitorFuture { monitor, inner } + } +} + +// === impl MonitorFuture === + +impl Future for MonitorFuture +where + M: MonitorError, + F: TryFuture, +{ + type Output = Result; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match futures::ready!(this.inner.try_poll(cx)) { + Ok(rsp) => Poll::Ready(Ok(rsp)), + Err(err) => { + this.monitor.monitor_error(&err); + Poll::Ready(Err(err)) + } + } + } +}