Skip to content

Commit

Permalink
chore(proxy/http): address hyper::client::conn::Builder deprecations (
Browse files Browse the repository at this point in the history
#3427)

this branch contains a sequence of commits that focus on addressing
deprecation warnings related to hyper::client::conn::Builder. this
branch enables the backports feature, and then replaces some of these
builders with the backported, http/2 specific,
hyper::client::conn::http2::Builder type.

relates to linkerd/linkerd2#8733.

---

* chore(proxy/http): address `h2::Connection` deprecation

this commit updates `Connection<B>` to use the backported, http/2
specific `SendRequest` type.

this commit enables the `backports` feature flag in the `hyper`
dependency.

Signed-off-by: katelyn martin <kate@buoyant.io>

* chore(proxy/http): address `server::tests` deprecations

Signed-off-by: katelyn martin <kate@buoyant.io>

* refactor(proxy/http): consolidate connect functions

`connect()` is never called elsewhere. let's avoid the misdirection and
move it into the body of `connect_h2()`.

Signed-off-by: katelyn martin <kate@buoyant.io>

---------

Signed-off-by: katelyn martin <kate@buoyant.io>
  • Loading branch information
cratelyn authored Dec 6, 2024
1 parent c8f7ca1 commit 2989101
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 44 deletions.
1 change: 1 addition & 0 deletions linkerd/proxy/http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ http = "0.2"
http-body = "0.4"
httparse = "1"
hyper = { version = "0.14", features = [
"backports",
"client",
"deprecated",
"http1",
Expand Down
31 changes: 14 additions & 17 deletions linkerd/proxy/http/src/h2.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::TracingExecutor;
use futures::prelude::*;
use hyper::{body::HttpBody, client::conn};
use hyper::body::HttpBody;
use linkerd_error::{Error, Result};
use linkerd_stack::{MakeConnection, Service};
use std::{
Expand All @@ -23,8 +23,7 @@ pub struct Connect<C, B> {

#[derive(Debug)]
pub struct Connection<B> {
#[allow(deprecated)] // linkerd/linkerd2#8733
tx: hyper::client::conn::SendRequest<B>,
tx: hyper::client::conn::http2::SendRequest<B>,
}

// === impl Connect ===
Expand Down Expand Up @@ -87,21 +86,19 @@ where
Box::pin(
async move {
let (io, _meta) = connect.err_into::<Error>().await?;
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut builder = conn::Builder::new();
builder.executor(TracingExecutor).http2_only(true);
let mut builder = hyper::client::conn::http2::Builder::new(TracingExecutor);
match flow_control {
None => {}
Some(FlowControl::Adaptive) => {
builder.http2_adaptive_window(true);
builder.adaptive_window(true);
}
Some(FlowControl::Fixed {
initial_stream_window_size,
initial_connection_window_size,
}) => {
builder
.http2_initial_stream_window_size(initial_stream_window_size)
.http2_initial_connection_window_size(initial_connection_window_size);
.initial_stream_window_size(initial_stream_window_size)
.initial_connection_window_size(initial_connection_window_size);
}
}

Expand All @@ -113,17 +110,17 @@ where
}) = keep_alive
{
builder
.http2_keep_alive_timeout(timeout)
.http2_keep_alive_interval(interval)
.http2_keep_alive_while_idle(while_idle);
.keep_alive_timeout(timeout)
.keep_alive_interval(interval)
.keep_alive_while_idle(while_idle);
}

builder.http2_max_frame_size(max_frame_size);
builder.max_frame_size(max_frame_size);
if let Some(max) = max_concurrent_reset_streams {
builder.http2_max_concurrent_reset_streams(max);
builder.max_concurrent_reset_streams(max);
}
if let Some(sz) = max_send_buf_size {
builder.http2_max_send_buf_size(sz);
builder.max_send_buf_size(sz);
}

let (tx, conn) = builder
Expand Down Expand Up @@ -153,7 +150,7 @@ where
{
type Response = http::Response<hyper::Body>;
type Error = hyper::Error;
type Future = conn::ResponseFuture;
type Future = Pin<Box<dyn Send + Future<Output = Result<Self::Response, Self::Error>>>>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Expand All @@ -175,6 +172,6 @@ where
*req.version_mut() = http::Version::HTTP_11;
}

self.tx.send_request(req)
self.tx.send_request(req).boxed()
}
}
46 changes: 19 additions & 27 deletions linkerd/proxy/http/src/server/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::vec;

use super::*;
use bytes::Bytes;
use futures::FutureExt;
use http_body::Body;
use linkerd_stack::CloneParam;
use tokio::time;
Expand All @@ -26,10 +27,9 @@ async fn h2_connection_window_exhaustion() {
h2::ServerParams::default(),
// An HTTP/2 client with constrained connection and stream windows to
// force window exhaustion.
#[allow(deprecated)] // linkerd/linkerd2#8733
hyper::client::conn::Builder::new()
.http2_initial_connection_window_size(CLIENT_CONN_WINDOW)
.http2_initial_stream_window_size(CLIENT_STREAM_WINDOW),
hyper::client::conn::http2::Builder::new(TracingExecutor)
.initial_connection_window_size(CLIENT_CONN_WINDOW)
.initial_stream_window_size(CLIENT_STREAM_WINDOW),
)
.await;

Expand Down Expand Up @@ -100,8 +100,8 @@ async fn h2_stream_window_exhaustion() {
// A basic HTTP/2 server configuration with no overrides.
h2::ServerParams::default(),
// An HTTP/2 client with stream windows to force window exhaustion.
#[allow(deprecated)] // linkerd/linkerd2#8733
hyper::client::conn::Builder::new().http2_initial_stream_window_size(CLIENT_STREAM_WINDOW),
hyper::client::conn::http2::Builder::new(TracingExecutor)
.initial_stream_window_size(CLIENT_STREAM_WINDOW),
)
.await;

Expand Down Expand Up @@ -144,8 +144,7 @@ async fn h2_stream_window_exhaustion() {
const LOG_LEVEL: &str = "h2::proto=trace,hyper=trace,linkerd=trace,info";

struct TestServer {
#[allow(deprecated)] // linkerd/linkerd2#8733
client: hyper::client::conn::SendRequest<BoxBody>,
client: hyper::client::conn::http2::SendRequest<BoxBody>,
server: Handle,
}

Expand Down Expand Up @@ -184,8 +183,16 @@ async fn timeout<F: Future>(inner: F) -> Result<F::Output, time::error::Elapsed>

impl TestServer {
#[tracing::instrument(skip_all)]
#[allow(deprecated)] // linkerd/linkerd2#8733
async fn connect(params: Params, client: &mut hyper::client::conn::Builder) -> Self {
async fn connect_h2(
h2: h2::ServerParams,
client: &mut hyper::client::conn::http2::Builder,
) -> Self {
let params = Params {
drain: drain(),
version: Version::H2,
http2: h2,
};

// Build the HTTP server with a mocked inner service so that we can handle
// requests.
let (mock, server) = mock::pair();
Expand All @@ -196,7 +203,6 @@ impl TestServer {

// Build a real HTTP/2 client using the mocked socket.
let (client, task) = client
.executor(crate::TracingExecutor)
.handshake::<_, BoxBody>(cio)
.await
.expect("client connect");
Expand All @@ -205,21 +211,6 @@ impl TestServer {
Self { client, server }
}

#[allow(deprecated)] // linkerd/linkerd2#8733
async fn connect_h2(h2: h2::ServerParams, client: &mut hyper::client::conn::Builder) -> Self {
Self::connect(
// A basic HTTP/2 server configuration with no overrides.
Params {
drain: drain(),
version: Version::H2,
http2: h2,
},
// An HTTP/2 client with constrained connection and stream windows to accomodate
client.http2_only(true),
)
.await
}

/// Issues a request through the client to the mocked server and processes the
/// response. The mocked response body sender and the readable response body are
/// returned.
Expand All @@ -228,7 +219,8 @@ impl TestServer {
self.server.allow(1);
let mut call0 = self
.client
.send_request(http::Request::new(BoxBody::default()));
.send_request(http::Request::new(BoxBody::default()))
.boxed();
let (_req, next) = tokio::select! {
_ = (&mut call0) => unreachable!("client cannot receive a response"),
next = self.server.next_request() => next.expect("server not dropped"),
Expand Down

0 comments on commit 2989101

Please sign in to comment.