Skip to content

Commit

Permalink
chore(proxy/http): address hyper deprecations in ServeHttp<N>
Browse files Browse the repository at this point in the history
see linkerd/linkerd2#8733 for more information on upgrading to hyper 1.0.

this commit is based upon #3456, and #3457.

this commit is also contingent upon hyperium/hyper#3796, which backports
the server connection builder's `max_pending_accept_reset_streams()`
method.

this commit addresses hyper deprecations in `ServeHttp<N>`, which
defines a reusable HTTP/1 and HTTP/2 server for the linkerd proxy.

essentially, this commit replaces the singular `Http<E>` with a pair of
http/1 and http/2 specific connection `Builder`s. method names no longer
have `http2_*` prefixes, otherwise nothing about the connection setup
has been changed. in the `Service` implementation, we delegate to the
appropriate builder based upon the protocol version.

Signed-off-by: katelyn martin <kate@buoyant.io>
  • Loading branch information
cratelyn committed Dec 13, 2024
1 parent 1f228d1 commit 78f728c
Showing 1 changed file with 23 additions and 24 deletions.
47 changes: 23 additions & 24 deletions linkerd/proxy/http/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ pub struct NewServeHttp<X, N> {
params: X,
}

/// Serves HTTP connectionswith an inner service.
/// Serves HTTP connections with an inner service.
#[derive(Clone, Debug)]
pub struct ServeHttp<N> {
version: Version,
#[allow(deprecated)] // linkerd/linkerd2#8733
server: hyper::server::conn::Http<TracingExecutor>,
http1: hyper::server::conn::http1::Builder,
http2: hyper::server::conn::http2::Builder<TracingExecutor>,
inner: N,
drain: drain::Watch,
}
Expand Down Expand Up @@ -76,36 +76,38 @@ where
max_pending_accept_reset_streams,
} = h2;

#[allow(deprecated)] // linkerd/linkerd2#8733
let mut srv = hyper::server::conn::Http::new().with_executor(TracingExecutor);
let mut http2 = hyper::server::conn::http2::Builder::new(TracingExecutor);
match flow_control {
None => {}
Some(h2::FlowControl::Adaptive) => {
srv.http2_adaptive_window(true);
http2.adaptive_window(true);
}
Some(h2::FlowControl::Fixed {
initial_stream_window_size,
initial_connection_window_size,
}) => {
srv.http2_initial_stream_window_size(initial_stream_window_size)
.http2_initial_connection_window_size(initial_connection_window_size);
http2
.initial_stream_window_size(initial_stream_window_size)
.initial_connection_window_size(initial_connection_window_size);
}
}

// Configure HTTP/2 PING frames
if let Some(h2::KeepAlive { timeout, interval }) = keep_alive {
srv.http2_keep_alive_timeout(timeout)
.http2_keep_alive_interval(interval);
http2
.keep_alive_timeout(timeout)
.keep_alive_interval(interval);
}

srv.http2_max_concurrent_streams(max_concurrent_streams)
.http2_max_frame_size(max_frame_size)
.http2_max_pending_accept_reset_streams(max_pending_accept_reset_streams);
http2
.max_concurrent_streams(max_concurrent_streams)
.max_frame_size(max_frame_size)
.max_pending_accept_reset_streams(max_pending_accept_reset_streams);
if let Some(sz) = max_header_list_size {
srv.http2_max_header_list_size(sz);
http2.max_header_list_size(sz);
}
if let Some(sz) = max_send_buf_size {
srv.http2_max_send_buf_size(sz);
http2.max_send_buf_size(sz);
}

debug!(?version, "Creating HTTP service");
Expand All @@ -114,7 +116,8 @@ where
inner,
version,
drain,
server: srv,
http1: hyper::server::conn::http1::Builder::new(),
http2,
}
}
}
Expand Down Expand Up @@ -142,7 +145,8 @@ where
fn call(&mut self, io: I) -> Self::Future {
let version = self.version;
let drain = self.drain.clone();
let mut server = self.server.clone();
let http1 = self.http1.clone();
let http2 = self.http2.clone();

let res = io.peer_addr().map(|pa| {
let (handle, closed) = ClientHandle::new(pa);
Expand All @@ -162,10 +166,7 @@ where
BoxRequest::new(svc),
drain.clone(),
);
let mut conn = server
.http1_only(true)
.serve_connection(io, svc)
.with_upgrades();
let mut conn = http1.serve_connection(io, svc).with_upgrades();

tokio::select! {
res = &mut conn => {
Expand All @@ -186,9 +187,7 @@ where
}

Version::H2 => {
let mut conn = server
.http2_only(true)
.serve_connection(io, BoxRequest::new(svc));
let mut conn = http2.serve_connection(io, BoxRequest::new(svc));

tokio::select! {
res = &mut conn => {
Expand Down

0 comments on commit 78f728c

Please sign in to comment.