Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement HTTP/2 server push #10705

Merged
merged 1 commit into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@
import io.micronaut.http.server.netty.body.ImmediateByteBody;
import io.micronaut.http.server.netty.body.ImmediateMultiObjectBody;
import io.micronaut.http.server.netty.body.ImmediateSingleObjectBody;
import io.micronaut.http.server.netty.handler.Http2ServerHandler;
import io.micronaut.http.server.netty.multipart.NettyCompletedFileUpload;
import io.micronaut.web.router.RouteMatch;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
Expand All @@ -77,6 +79,8 @@
import io.netty.handler.codec.http2.DefaultHttp2PushPromiseFrame;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
import io.netty.handler.codec.http2.HttpConversionUtil;
Expand Down Expand Up @@ -480,8 +484,7 @@ private ChannelHandlerContext findConnectionHandler() {
public boolean isServerPushSupported() {
ChannelHandlerContext http2ConnectionHandlerContext = findConnectionHandler();
return http2ConnectionHandlerContext != null &&
((Http2ConnectionHandler) http2ConnectionHandlerContext.handler()).connection().remote().allowPushTo() &&
channelHandlerContext.channel() instanceof Http2StreamChannel;
((Http2ConnectionHandler) http2ConnectionHandlerContext.handler()).connection().remote().allowPushTo();
}

@Override
Expand All @@ -494,14 +497,10 @@ public PushCapableHttpRequest<T> serverPush(@NonNull HttpRequest<?> request) {
throw new UnsupportedOperationException("Server push not supported by this client: Client is HTTP2 but does not report support for this feature");
}

if (!(channelHandlerContext.channel() instanceof Http2StreamChannel streamChannel)) {
throw new UnsupportedOperationException("Server push currently not supported by new HTTP2 server handler. Enable the micronaut.server.netty.legacy-multiplex-handlers property");
}

URI configuredUri = request.getUri();
String scheme = configuredUri.getScheme();
if (scheme == null) {
scheme = channelHandlerContext.channel().parent().pipeline().get(SslHandler.class) == null ? SCHEME_HTTP : SCHEME_HTTPS;
scheme = connectionHandlerContext.channel().pipeline().get(SslHandler.class) == null ? SCHEME_HTTP : SCHEME_HTTPS;
}
String authority = configuredUri.getAuthority();
if (authority == null) {
Expand All @@ -523,7 +522,15 @@ public PushCapableHttpRequest<T> serverPush(@NonNull HttpRequest<?> request) {
}

// request used to trigger our handlers
io.netty.handler.codec.http.HttpRequest inboundRequest = NettyHttpRequestBuilder.asBuilder(request).toHttpRequestWithoutBody();
io.netty.handler.codec.http.HttpRequest inboundRequestNoBody = NettyHttpRequestBuilder.asBuilder(request).toHttpRequestWithoutBody();
FullHttpRequest inboundRequest = new DefaultFullHttpRequest(
inboundRequestNoBody.protocolVersion(),
inboundRequestNoBody.method(),
inboundRequestNoBody.uri(),
Unpooled.EMPTY_BUFFER,
inboundRequestNoBody.headers(),
EmptyHttpHeaders.INSTANCE
);

// copy headers from our request
for (Iterator<Map.Entry<CharSequence, CharSequence>> itr = headers.getNettyHeaders().iteratorCharSequence(); itr.hasNext(); ) {
Expand All @@ -543,52 +550,93 @@ public PushCapableHttpRequest<T> serverPush(@NonNull HttpRequest<?> request) {
fixedUri.toString(),
inboundRequest.headers()
);

int ourStream = streamChannel.stream().id();
HttpPipelineBuilder.StreamPipeline originalStreamPipeline = channelHandlerContext.channel().attr(HttpPipelineBuilder.STREAM_PIPELINE_ATTRIBUTE.get()).get();

new Http2StreamChannelBootstrap(channelHandlerContext.channel().parent())
.handler(new ChannelInitializer<Http2StreamChannel>() {
@Override
protected void initChannel(@NonNull Http2StreamChannel ch) throws Exception {
int newStream = ch.stream().id();

channelHandlerContext.write(new DefaultHttp2PushPromiseFrame(HttpConversionUtil.toHttp2Headers(outboundRequest, false))
.stream(((Http2StreamChannel) channelHandlerContext.channel()).stream())
.pushStream(ch.stream()));

originalStreamPipeline.initializeChildPipelineForPushPromise(ch);

inboundRequest.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), newStream);
inboundRequest.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_PROMISE_ID.text(), ourStream);

// delay until our handling is complete
connectionHandlerContext.executor().execute(() -> {
try {
ch.pipeline().context(ChannelPipelineCustomizer.HANDLER_HTTP_DECODER).fireChannelRead(inboundRequest);
} catch (Exception e) {
LOG.warn("Failed to complete push promise", e);
Http2Headers outboundHeaders = HttpConversionUtil.toHttp2Headers(outboundRequest, false);

if (channelHandlerContext.channel() instanceof Http2StreamChannel streamChannel) {
int ourStream = streamChannel.stream().id();
HttpPipelineBuilder.StreamPipeline originalStreamPipeline = channelHandlerContext.channel().attr(HttpPipelineBuilder.STREAM_PIPELINE_ATTRIBUTE.get()).get();

new Http2StreamChannelBootstrap(channelHandlerContext.channel().parent())
.handler(new ChannelInitializer<Http2StreamChannel>() {
@Override
protected void initChannel(@NonNull Http2StreamChannel ch) throws Exception {
int newStream = ch.stream().id();

channelHandlerContext.write(new DefaultHttp2PushPromiseFrame(outboundHeaders)
.stream(((Http2StreamChannel) channelHandlerContext.channel()).stream())
.pushStream(ch.stream()));

originalStreamPipeline.initializeChildPipelineForPushPromise(ch);

inboundRequest.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), newStream);
inboundRequest.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_PROMISE_ID.text(), ourStream);

// delay until our handling is complete
connectionHandlerContext.executor().execute(() -> {
try {
ch.pipeline().context(ChannelPipelineCustomizer.HANDLER_HTTP_DECODER).fireChannelRead(inboundRequest);
} catch (Exception e) {
LOG.warn("Failed to complete push promise", e);
}
});
}
})
.open()
.addListener((GenericFutureListener<Future<Http2StreamChannel>>) future -> {
try {
future.sync();
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
});
}
})
.open()
.addListener((GenericFutureListener<Future<Http2StreamChannel>>) future -> {
try {
future.sync();
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
LOG.warn("Failed to complete push promise", e);
}
LOG.warn("Failed to complete push promise", e);
}
});
});
} else {
int ourStreamId = headers.getNettyHeaders().getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text());
if (channelHandlerContext.executor().inEventLoop()) {
serverPush0(connectionHandler, ourStreamId, inboundRequest, connectionHandlerContext, outboundHeaders);
} else {
channelHandlerContext.executor().execute(() -> serverPush0(connectionHandler, ourStreamId, inboundRequest, connectionHandlerContext, outboundHeaders));
}
}
return this;
} else {
throw new UnsupportedOperationException("Server push not supported by this client: Not a HTTP2 client");
}
}

private static void serverPush0(Http2ConnectionHandler connectionHandler, int ourStreamId, FullHttpRequest inboundRequest, ChannelHandlerContext connectionHandlerContext, Http2Headers outboundHeaders) {
try {
Http2Stream ourStream = connectionHandler.connection().stream(ourStreamId);
if (ourStream == null) {
// this is a bit ugly. Ideally serverPush would return a Publisher that completes when the promise is sent
throw new IllegalStateException("Push promise origin stream is already gone. " +
"This can happen if serverPush() is called outside the event loop, and the primary response is sent before the push promise. " +
"Please either call serverPush() on the event loop, or delay the primary response.");
}
int newStreamId = connectionHandler.connection().local().incrementAndGetNextStreamId();

inboundRequest.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), newStreamId);
inboundRequest.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_PROMISE_ID.text(), ourStreamId);

connectionHandler.encoder().writePushPromise(connectionHandlerContext, ourStreamId, newStreamId, outboundHeaders, 0, connectionHandlerContext.newPromise())
.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
try {
((Http2ServerHandler.ConnectionHandler) connectionHandler).handleFakeRequest(connectionHandler.connection().stream(newStreamId), inboundRequest);
} catch (Exception e) {
LOG.warn("Failed to send push promise", e);
}
} else {
LOG.warn("Failed to send push promise", future.cause());
}
});
} catch (Exception e) {
LOG.warn("Failed to send push promise", e);
}
}

@Override
protected Charset initCharset(Charset characterEncoding) {
return characterEncoding == null ? serverConfiguration.getDefaultCharset() : characterEncoding;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int stream
/**
* {@link Http2ConnectionHandler} implementation containing the {@link Http2ServerHandler}.
*/
static final class ConnectionHandler extends Http2ConnectionHandler {
public static final class ConnectionHandler extends Http2ConnectionHandler {
private final Http2ServerHandler handler;

private ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings, boolean decoupleCloseAndGoAway, boolean flushPreface, Http2ServerHandler handler) {
Expand Down Expand Up @@ -236,16 +236,28 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
if (evt instanceof HttpServerUpgradeHandler.UpgradeEvent upgrade) {
FullHttpRequest fhr = upgrade.upgradeRequest();
io.netty.handler.codec.http2.Http2Stream cs = connection().stream(1);
Http2ServerHandler.Http2Stream stream = handler.new Http2Stream(cs);
cs.setProperty(handler.streamKey, stream);
boolean empty = !fhr.content().isReadable();
stream.onHeadersRead(fhr, empty);
if (!empty) {
stream.onDataRead(fhr.content(), true);
}
handleFakeRequest(cs, fhr);
}
super.userEventTriggered(ctx, evt);
}

/**
* Handle a request on the given stream that did not actually come in as an HTTP/2 request.
* This is used for the h2c upgrade request which is an HTTP/1.1 request that expects an
* HTTP/2 response, and for push promises where the request is initiated by the application.
*
* @param onStream The stream that the response should be sent on
* @param fhr The fake request
*/
public void handleFakeRequest(io.netty.handler.codec.http2.Http2Stream onStream, FullHttpRequest fhr) {
Http2Stream stream = handler.new Http2Stream(onStream);
onStream.setProperty(handler.streamKey, stream);
boolean empty = !fhr.content().isReadable();
stream.onHeadersRead(fhr, empty);
if (!empty) {
stream.onDataRead(fhr.content(), true);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ import java.util.concurrent.CompletableFuture
@Property(name = "micronaut.server.ssl.enabled", value = "true")
@Property(name = "micronaut.server.ssl.port", value = "-1")
@Property(name = "micronaut.server.ssl.buildSelfSigned", value = "true")
@Property(name = "micronaut.server.netty.legacy-multiplex-handlers", value = "true")
@Property(name = "spec.name", value = "Http2ServerPushSpec")
class Http2ServerPushSpec extends Specification {
@Inject
Expand Down
Loading