Skip to content

Commit

Permalink
factor out boilerplate
Browse files Browse the repository at this point in the history
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed Dec 10, 2020
1 parent 49b7530 commit 1ea73aa
Showing 1 changed file with 77 additions and 62 deletions.
139 changes: 77 additions & 62 deletions linkerd/app/outbound/src/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ use crate::test_util::{
*,
};
use crate::Config;
use hyper::{body::Buf, Body, Request, Response};
use hyper::{
body::Buf,
client::conn::{Builder as ClientBuilder, SendRequest},
Body, Request, Response,
};
use linkerd2_app_core::{
drain, metrics,
proxy::{identity::Name, tap},
Expand Down Expand Up @@ -157,7 +161,7 @@ async fn profile_endpoint_propagates_conn_errors() {
tracing::info!(?res, "Server complete");
res
});
let (mut client, conn) = hyper::client::conn::Builder::new()
let (mut client, conn) = ClientBuilder::new()
.handshake(client_io)
.await
.expect("Client must connect");
Expand All @@ -172,7 +176,7 @@ async fn profile_endpoint_propagates_conn_errors() {
.await
.expect("Client must not fail")
.call(
hyper::Request::builder()
Request::builder()
.header("Host", "foo.ns1.service.cluster.local")
.body(hyper::Body::default())
.unwrap(),
Expand Down Expand Up @@ -201,15 +205,15 @@ async fn profile_endpoint_propagates_conn_errors() {
async fn unmeshed_http1_hello_world() {
let mut server = hyper::server::conn::Http::new();
server.http1_only(true);
let client = hyper::client::conn::Builder::new();
let client = ClientBuilder::new();
unmeshed_hello_world(server, client).await;
}

#[tokio::test(flavor = "current_thread")]
async fn unmeshed_http2_hello_world() {
let mut server = hyper::server::conn::Http::new();
server.http2_only(true);
let mut client = hyper::client::conn::Builder::new();
let mut client = ClientBuilder::new();
client.http2_only(true);
unmeshed_hello_world(server, client).await;
}
Expand Down Expand Up @@ -242,15 +246,14 @@ async fn meshed_hello_world() {
server_settings.http2_only(true);
let connect = support::connect().endpoint_fn_boxed(ep1, hello_server(server_settings));

let profiles = profile::resolver();
let profile_tx = profiles.profile_tx(ep1);
profile_tx
.send(profile::Profile {
let profiles = profile::resolver().profile(
ep1,
profile::Profile {
opaque_protocol: false,
name: Some(svc_name.clone()),
..Default::default()
})
.expect("still listening");
},
);

let resolver = support::resolver::<Addr, support::resolver::Metadata>();
let mut dst = resolver.endpoint_tx((svc_name, ep1.port()));
Expand All @@ -260,47 +263,81 @@ async fn meshed_hello_world() {
// Build the outbound server
let (mut s, _shutdown) = build_server(cfg, profiles, resolver, connect);
let server = s.new_service(addrs);
let (mut client, bg) = connect_client(&mut ClientBuilder::new(), server).await;

let rsp = http_request(&mut client, Request::default()).await;
assert_eq!(rsp.status(), http::StatusCode::OK);
let mut body = hyper::body::aggregate(rsp.into_body())
.await
.expect("body shouldn't error");
let mut buf = vec![0u8; body.remaining()];
body.copy_to_slice(&mut buf[..]);
assert_eq!(std::str::from_utf8(&buf[..]), Ok("Hello world!"));

drop(client);
bg.await.unwrap();
}

async fn connect_client<S>(
client_settings: &mut ClientBuilder,
server: S,
) -> (
hyper::client::conn::SendRequest<Body>,
tokio::task::JoinHandle<()>,
)
where
S: svc::Service<support::io::DuplexStream> + Send + Sync + 'static,
S::Error: Into<Error>,
S::Response: std::fmt::Debug + Send + Sync + 'static,
S::Future: Send,
{
tracing::info!(settings = ?client_settings, "connecting client with");
let (client_io, server_io) = support::io::duplex(4096);
tokio::spawn(async move {
let res = server.oneshot(server_io).err_into::<Error>().await;
tracing::info!(?res, "Server complete");
res
});
let (mut client, conn) = hyper::client::conn::Builder::new()
let proxy = server
.oneshot(server_io)
.map(|res| {
let res = res.map_err(Into::into);
tracing::info!(?res, "Server complete");
res.expect("proxy failed");
})
.instrument(tracing::info_span!("proxy"));
let (client, conn) = client_settings
.handshake(client_io)
.await
.expect("Client must connect");
tokio::spawn(async move {
let res = conn.await;
tracing::info!(?res, "Client connection complete");
res
let client_bg = conn
.map(|res| {
tracing::info!(?res, "Client background complete");
res.expect("client bg task failed");
})
.instrument(tracing::info_span!("client_bg"));
let bg = tokio::spawn(async move {
tokio::join! {
proxy,
client_bg,
};
});
(client, bg)
}

#[tracing::instrument(skip(client))]
async fn http_request(client: &mut SendRequest<Body>, request: Request<Body>) -> Response<Body> {
let rsp = client
.ready_and()
.await
.expect("Client must not fail")
.call(
hyper::Request::builder()
.body(hyper::Body::default())
.unwrap(),
)
.call(request)
.await
.expect("Request must succeed");

tracing::info!(?rsp);
assert_eq!(rsp.status(), http::StatusCode::OK);
let mut body = hyper::body::aggregate(rsp.into_body())
.await
.expect("body shouldn't error");
let mut buf = vec![0u8; body.remaining()];
body.copy_to_slice(&mut buf[..]);
assert_eq!(std::str::from_utf8(&buf[..]), Ok("Hello world!"));

rsp
}

async fn unmeshed_hello_world(
server_settings: hyper::server::conn::Http,
client_settings: hyper::client::conn::Builder,
mut client_settings: ClientBuilder,
) {
let _trace = support::trace_init();

Expand All @@ -324,44 +361,22 @@ async fn unmeshed_hello_world(
// Build the outbound server
let (mut s, _shutdown) = build_server(cfg, profiles, resolver, connect);
let server = s.new_service(addrs);
let (mut client, bg) = connect_client(&mut client_settings, server).await;

let (client_io, server_io) = support::io::duplex(4096);
tokio::spawn(async move {
let res = server.oneshot(server_io).err_into::<Error>().await;
tracing::info!(?res, "Server complete");
res
});
let (mut client, conn) = client_settings
.handshake(client_io)
.await
.expect("Client must connect");
tokio::spawn(async move {
let res = conn.await;
tracing::info!(?res, "Client connection complete");
res
});

let rsp = client
.ready_and()
.await
.expect("Client must not fail")
.call(
hyper::Request::builder()
.body(hyper::Body::default())
.unwrap(),
)
.await
.expect("Request must succeed");
tracing::info!(?rsp);
let rsp = http_request(&mut client, Request::default()).await;
assert_eq!(rsp.status(), http::StatusCode::OK);
let mut body = hyper::body::aggregate(rsp.into_body())
.await
.expect("body shouldn't error");
let mut buf = vec![0u8; body.remaining()];
body.copy_to_slice(&mut buf[..]);
assert_eq!(std::str::from_utf8(&buf[..]), Ok("Hello world!"));

drop(client);
bg.await.unwrap();
}

#[tracing::instrument]
fn hello_server(http: hyper::server::conn::Http) -> impl Fn(Endpoint) -> Result<BoxedIo, Error> {
move |endpoint| {
let span = tracing::info_span!("hello_server", ?endpoint);
Expand Down

0 comments on commit 1ea73aa

Please sign in to comment.