Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(app/test): address hyper deprecations in test helpers #3433

Merged
merged 10 commits into from
Dec 9, 2024
142 changes: 83 additions & 59 deletions linkerd/app/inbound/src/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ use linkerd_app_core::{
errors::respond::L5D_PROXY_ERROR,
identity, io, metrics,
proxy::http,
svc::{self, NewService, Param},
svc::{self, http::TracingExecutor, NewService, Param},
tls,
transport::{ClientAddr, OrigDstAddr, Remote, ServerAddr},
NameAddr, ProxyRuntime,
Error, NameAddr, ProxyRuntime,
};
use linkerd_app_test::connect::ConnectFuture;
use linkerd_tracing::test::trace_init;
Expand Down Expand Up @@ -47,9 +47,7 @@ where

#[tokio::test(flavor = "current_thread")]
async fn unmeshed_http1_hello_world() {
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut server = hyper::server::conn::Http::new();
server.http1_only(true);
let server = hyper::server::conn::http1::Builder::new();
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut client = hyper::client::conn::Builder::new();
let _trace = trace_init();
Expand Down Expand Up @@ -82,15 +80,18 @@ async fn unmeshed_http1_hello_world() {
let body = http_util::body_to_string(rsp.into_body()).await.unwrap();
assert_eq!(body, "Hello world!");

bg.await.expect("background task failed");
// Wait for all of the background tasks to complete, panicking if any returned an error.
bg.join_all()
.await
.into_iter()
.collect::<Result<Vec<()>, Error>>()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want, you can use core::Result so this is just .collect::<Result<Vec<()>>>()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i took a swing at this, but i think i'll opt to elide this.

i have a personal aversion to importing Result aliases that overwrite the Result<T, E> included in the standard prelude, and found that the alternate .collect::<linkerd_app_core::Result<Vec<()>>>() form didn't really golf this down much.

.expect("background task failed");
}

#[tokio::test(flavor = "current_thread")]
async fn downgrade_origin_form() {
// Reproduces https://github.com/linkerd/linkerd2/issues/5298
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut server = hyper::server::conn::Http::new();
server.http1_only(true);
let server = hyper::server::conn::http1::Builder::new();
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut client = hyper::client::conn::Builder::new();
client.http2_only(true);
Expand Down Expand Up @@ -126,14 +127,17 @@ async fn downgrade_origin_form() {
let body = http_util::body_to_string(rsp.into_body()).await.unwrap();
assert_eq!(body, "Hello world!");

bg.await.expect("background task failed");
// Wait for all of the background tasks to complete, panicking if any returned an error.
bg.join_all()
.await
.into_iter()
.collect::<Result<Vec<()>, Error>>()
.expect("background task failed");
}

#[tokio::test(flavor = "current_thread")]
async fn downgrade_absolute_form() {
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut server = hyper::server::conn::Http::new();
server.http1_only(true);
let server = hyper::server::conn::http1::Builder::new();
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut client = hyper::client::conn::Builder::new();
client.http2_only(true);
Expand Down Expand Up @@ -169,7 +173,12 @@ async fn downgrade_absolute_form() {
let body = http_util::body_to_string(rsp.into_body()).await.unwrap();
assert_eq!(body, "Hello world!");

bg.await.expect("background task failed");
// Wait for all of the background tasks to complete, panicking if any returned an error.
bg.join_all()
.await
.into_iter()
.collect::<Result<Vec<()>, Error>>()
.expect("background task failed");
}

#[tokio::test(flavor = "current_thread")]
Expand Down Expand Up @@ -211,7 +220,12 @@ async fn http1_bad_gateway_meshed_response_error_header() {
// logical error context is added.
check_error_header(rsp.headers(), "server is not listening");

bg.await.expect("background task failed");
// Wait for all of the background tasks to complete, panicking if any returned an error.
bg.join_all()
.await
.into_iter()
.collect::<Result<Vec<()>, Error>>()
.expect("background task failed");
}

#[tokio::test(flavor = "current_thread")]
Expand Down Expand Up @@ -252,7 +266,12 @@ async fn http1_bad_gateway_unmeshed_response() {
"response must not contain L5D_PROXY_ERROR header"
);

bg.await.expect("background task failed");
// Wait for all of the background tasks to complete, panicking if any returned an error.
bg.join_all()
.await
.into_iter()
.collect::<Result<Vec<()>, Error>>()
.expect("background task failed");
}

#[tokio::test(flavor = "current_thread")]
Expand All @@ -262,9 +281,7 @@ async fn http1_connect_timeout_meshed_response_error_header() {

// Build a mock connect that sleeps longer than the default inbound
// connect timeout.
#[allow(deprecated)] // linkerd/linkerd2#8733
let server = hyper::server::conn::Http::new();
let connect = support::connect().endpoint(Target::addr(), connect_timeout(server));
let connect = support::connect().endpoint(Target::addr(), connect_timeout());

// Build a client using the connect that always sleeps so that responses
// are GATEWAY_TIMEOUT.
Expand Down Expand Up @@ -299,7 +316,12 @@ async fn http1_connect_timeout_meshed_response_error_header() {
// logical error context is added.
check_error_header(rsp.headers(), "connect timed out after 1s");

bg.await.expect("background task failed");
// Wait for all of the background tasks to complete, panicking if any returned an error.
bg.join_all()
.await
.into_iter()
.collect::<Result<Vec<()>, Error>>()
.expect("background task failed");
}

#[tokio::test(flavor = "current_thread")]
Expand All @@ -309,9 +331,7 @@ async fn http1_connect_timeout_unmeshed_response_error_header() {

// Build a mock connect that sleeps longer than the default inbound
// connect timeout.
#[allow(deprecated)] // linkerd/linkerd2#8733
let server = hyper::server::conn::Http::new();
let connect = support::connect().endpoint(Target::addr(), connect_timeout(server));
let connect = support::connect().endpoint(Target::addr(), connect_timeout());

// Build a client using the connect that always sleeps so that responses
// are GATEWAY_TIMEOUT.
Expand Down Expand Up @@ -344,7 +364,12 @@ async fn http1_connect_timeout_unmeshed_response_error_header() {
"response must not contain L5D_PROXY_ERROR header"
);

bg.await.expect("background task failed");
// Wait for all of the background tasks to complete, panicking if any returned an error.
bg.join_all()
.await
.into_iter()
.collect::<Result<Vec<()>, Error>>()
.expect("background task failed");
}

#[tokio::test(flavor = "current_thread")]
Expand Down Expand Up @@ -386,7 +411,7 @@ async fn h2_response_meshed_error_header() {
// Drop the client and discard the result of awaiting the proxy background
// task. The result is discarded because it hits an error that is related
// to the mock implementation and has no significance to the test.
let _ = bg.await;
let _ = bg.join_all().await;
}

#[tokio::test(flavor = "current_thread")]
Expand Down Expand Up @@ -430,7 +455,7 @@ async fn h2_response_unmeshed_error_header() {
// Drop the client and discard the result of awaiting the proxy background
// task. The result is discarded because it hits an error that is related
// to the mock implementation and has no significance to the test.
let _ = bg.await;
let _ = bg.join_all().await;
}

#[tokio::test(flavor = "current_thread")]
Expand Down Expand Up @@ -473,7 +498,7 @@ async fn grpc_meshed_response_error_header() {
// Drop the client and discard the result of awaiting the proxy background
// task. The result is discarded because it hits an error that is related
// to the mock implementation and has no significance to the test.
let _ = bg.await;
let _ = bg.join_all().await;
}

#[tokio::test(flavor = "current_thread")]
Expand Down Expand Up @@ -518,7 +543,7 @@ async fn grpc_unmeshed_response_error_header() {
// Drop the client and discard the result of awaiting the proxy background
// task. The result is discarded because it hits an error that is related
// to the mock implementation and has no significance to the test.
let _ = bg.await;
let _ = bg.join_all().await;
}

#[tokio::test(flavor = "current_thread")]
Expand All @@ -527,9 +552,7 @@ async fn grpc_response_class() {

// Build a mock connector serves a gRPC server that returns errors.
let connect = {
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut server = hyper::server::conn::Http::new();
server.http2_only(true);
let server = hyper::server::conn::http2::Builder::new(TracingExecutor);
support::connect().endpoint_fn_boxed(
Target::addr(),
grpc_status_server(server, tonic::Code::Unknown),
Expand Down Expand Up @@ -606,9 +629,8 @@ async fn grpc_response_class() {
}

#[tracing::instrument]
#[allow(deprecated)] // linkerd/linkerd2#8733
fn hello_server(
http: hyper::server::conn::Http,
server: hyper::server::conn::http1::Builder,
) -> impl Fn(Remote<ServerAddr>) -> io::Result<io::BoxedIo> {
move |endpoint| {
let span = tracing::info_span!("hello_server", ?endpoint);
Expand All @@ -620,7 +642,8 @@ fn hello_server(
Ok::<_, io::Error>(Response::new(Body::from("Hello world!")))
});
tokio::spawn(
http.serve_connection(server_io, hello_svc)
server
.serve_connection(server_io, hello_svc)
.in_current_span(),
);
Ok(io::BoxedIo::new(client_io))
Expand All @@ -630,7 +653,7 @@ fn hello_server(
#[tracing::instrument]
#[allow(deprecated)] // linkerd/linkerd2#8733
fn grpc_status_server(
http: hyper::server::conn::Http,
server: hyper::server::conn::http2::Builder<TracingExecutor>,
status: tonic::Code,
) -> impl Fn(Remote<ServerAddr>) -> io::Result<io::BoxedIo> {
move |endpoint| {
Expand All @@ -639,26 +662,30 @@ fn grpc_status_server(
tracing::info!("mock connecting");
let (client_io, server_io) = support::io::duplex(4096);
tokio::spawn(
http.serve_connection(
server_io,
hyper::service::service_fn(move |request: Request<Body>| async move {
tracing::info!(?request);
let (mut tx, rx) = Body::channel();
tokio::spawn(async move {
let mut trls = ::http::HeaderMap::new();
trls.insert("grpc-status", (status as u32).to_string().parse().unwrap());
tx.send_trailers(trls).await
});
Ok::<_, io::Error>(
http::Response::builder()
.version(::http::Version::HTTP_2)
.header("content-type", "application/grpc")
.body(rx)
.unwrap(),
)
}),
)
.in_current_span(),
server
.serve_connection(
server_io,
hyper::service::service_fn(move |request: Request<Body>| async move {
tracing::info!(?request);
let (mut tx, rx) = Body::channel();
tokio::spawn(async move {
let mut trls = ::http::HeaderMap::new();
trls.insert(
"grpc-status",
(status as u32).to_string().parse().unwrap(),
);
tx.send_trailers(trls).await
});
Ok::<_, io::Error>(
http::Response::builder()
.version(::http::Version::HTTP_2)
.header("content-type", "application/grpc")
.body(rx)
.unwrap(),
)
}),
)
.in_current_span(),
);
Ok(io::BoxedIo::new(client_io))
}
Expand All @@ -675,10 +702,7 @@ fn connect_error() -> impl Fn(Remote<ServerAddr>) -> io::Result<io::BoxedIo> {
}

#[tracing::instrument]
#[allow(deprecated)] // linkerd/linkerd2#8733
fn connect_timeout(
http: hyper::server::conn::Http,
) -> Box<dyn FnMut(Remote<ServerAddr>) -> ConnectFuture + Send> {
fn connect_timeout() -> Box<dyn FnMut(Remote<ServerAddr>) -> ConnectFuture + Send> {
Box::new(move |endpoint| {
let span = tracing::info_span!("connect_timeout", ?endpoint);
Box::pin(
Expand Down
Loading
Loading