diff --git a/linkerd/app/outbound/src/endpoint.rs b/linkerd/app/outbound/src/endpoint.rs index 5763138134..bb73e32e3b 100644 --- a/linkerd/app/outbound/src/endpoint.rs +++ b/linkerd/app/outbound/src/endpoint.rs @@ -32,13 +32,17 @@ pub struct FromMetadata { // === impl Endpoint === impl Endpoint<()> { - pub(crate) fn forward(addr: OrigDstAddr, reason: tls::NoClientTls) -> Self { + pub(crate) fn forward( + addr: OrigDstAddr, + reason: tls::NoClientTls, + opaque_protocol: bool, + ) -> Self { Self { addr: Remote(ServerAddr(addr.into())), metadata: Metadata::default(), tls: Conditional::None(reason), logical_addr: None, - opaque_protocol: false, + opaque_protocol, protocol: (), } } @@ -263,6 +267,7 @@ pub mod tests { .new_service(tcp::Endpoint::forward( OrigDstAddr(addr), tls::NoClientTls::Disabled, + false, )); let (client_io, server_io) = support::io::duplex(4096); diff --git a/linkerd/app/outbound/src/switch_logical.rs b/linkerd/app/outbound/src/switch_logical.rs index e7bea74efe..9f0205427f 100644 --- a/linkerd/app/outbound/src/switch_logical.rs +++ b/linkerd/app/outbound/src/switch_logical.rs @@ -32,10 +32,11 @@ impl Outbound { .push_switch( move |(profile, target): (Option, T)| -> Result<_, Infallible> { if let Some(rx) = profile { - // If the profile provides an endpoint, then the target is single endpoint and - // not a logical/load-balanced service. + let is_opaque = rx.is_opaque_protocol(); + + // If the profile provides an endpoint, then the target is single + // endpoint and not a logical/load-balanced service. if let Some((addr, metadata)) = rx.endpoint() { - let is_opaque = rx.is_opaque_protocol(); tracing::debug!(%is_opaque, "Profile describes an endpoint"); return Ok(svc::Either::A(Endpoint::from_metadata( addr, @@ -46,20 +47,33 @@ impl Outbound { ))); } - // Otherwise, if the profile provides a (named) logical address, then we build a + // If the profile provides a (named) logical address, then we build a // logical stack so we apply routes, traffic splits, and load balancing. if let Some(logical_addr) = rx.logical_addr() { tracing::debug!("Profile describes a logical service"); return Ok(svc::Either::B(Logical::new(logical_addr, rx))); } + + // Otherwise, if there was a profile but it didn't include an endpoint or logical + // address, create a bare endpoint from the original destination address + // using the profile-provided opaqueness. This applies for targets that + // aren't known by the destination controller that may target ports + // included in the cluster-wide default opaque list. + tracing::debug!("Unknown endpoint"); + return Ok(svc::Either::A(Endpoint::forward( + target.param(), + no_tls_reason, + is_opaque, + ))); } - // If there was no profile or it didn't include any useful metadata, create a bare - // endpoint from the original destination address. - tracing::debug!("No profile; forwarding to the original destination"); + // If there was no profile, create a bare endpoint from the original + // destination address. + tracing::debug!("No profile"); Ok(svc::Either::A(Endpoint::forward( target.param(), no_tls_reason, + false, ))) }, logical, @@ -175,4 +189,37 @@ mod tests { let (server_io, _client_io) = io::duplex(1); svc.oneshot(server_io).await.expect("service must succeed"); } + + #[tokio::test(flavor = "current_thread")] + async fn profile_neither() { + let _trace = linkerd_tracing::test::trace_init(); + + let endpoint_addr = SocketAddr::new([192, 0, 2, 20].into(), 2020); + let endpoint = { + let endpoint_addr = endpoint_addr.clone(); + move |ep: tcp::Endpoint| { + assert_eq!(ep.addr.as_ref(), &endpoint_addr); + assert!(ep.opaque_protocol, "protocol must be marked opaque"); + svc::mk(|_: io::DuplexStream| future::ok::<(), Error>(())) + } + }; + + let (rt, _shutdown) = runtime(); + let stack = Outbound::new(default_config(), rt) + .with_stack(endpoint) + .push_switch_logical(svc::Fail::<_, WrongStack>::default()) + .into_inner(); + + let (_tx, profile) = tokio::sync::watch::channel(profiles::Profile { + endpoint: None, + opaque_protocol: true, + addr: None, + ..Default::default() + }); + + let orig_dst = OrigDstAddr(endpoint_addr); + let svc = stack.new_service((Some(profile.into()), orig_dst)); + let (server_io, _client_io) = io::duplex(1); + svc.oneshot(server_io).await.expect("service must succeed"); + } } diff --git a/linkerd/service-profiles/src/client.rs b/linkerd/service-profiles/src/client.rs index 0712a641c3..be79860eff 100644 --- a/linkerd/service-profiles/src/client.rs +++ b/linkerd/service-profiles/src/client.rs @@ -10,7 +10,7 @@ use std::{ task::{Context, Poll}, }; use tonic::{body::BoxBody, client::GrpcService}; -use tracing::debug; +use tracing::{debug, trace}; /// Creates watches on service profiles. #[derive(Clone, Debug)] @@ -73,7 +73,9 @@ where Box::pin(async move { match w.spawn_watch(addr).await { Ok(rsp) => { + debug!("Resolved profile"); let rx = rsp.into_inner(); + trace!(profile = ?rx.borrow()); Ok(Some(rx.into())) } Err(status) => {