From 0add64640c27e42a17ab3d8c32fd70089d3ac18a Mon Sep 17 00:00:00 2001 From: yawkat Date: Fri, 12 Apr 2024 10:56:46 +0200 Subject: [PATCH] Implement HTTP/2 server push Another feature for #10596. Once again no separate tests, this functionality is covered by Http2ServerPushSpec when legacy-multiplex-handlers is off. --- .../http/server/netty/NettyHttpRequest.java | 140 ++++++++++++------ .../netty/handler/Http2ServerHandler.java | 28 +++- .../netty/http2/Http2ServerPushSpec.groovy | 1 - 3 files changed, 114 insertions(+), 55 deletions(-) diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/NettyHttpRequest.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/NettyHttpRequest.java index 2abd652833d..f6d9a5fb750 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/NettyHttpRequest.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/NettyHttpRequest.java @@ -58,11 +58,13 @@ import io.micronaut.http.server.netty.body.ImmediateByteBody; import io.micronaut.http.server.netty.body.ImmediateMultiObjectBody; import io.micronaut.http.server.netty.body.ImmediateSingleObjectBody; +import io.micronaut.http.server.netty.handler.Http2ServerHandler; import io.micronaut.http.server.netty.multipart.NettyCompletedFileUpload; import io.micronaut.web.router.RouteMatch; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.handler.codec.http.DefaultFullHttpRequest; @@ -77,6 +79,8 @@ import io.netty.handler.codec.http2.DefaultHttp2PushPromiseFrame; import io.netty.handler.codec.http2.Http2ConnectionHandler; import io.netty.handler.codec.http2.Http2FrameCodec; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.codec.http2.Http2Stream; import io.netty.handler.codec.http2.Http2StreamChannel; import io.netty.handler.codec.http2.Http2StreamChannelBootstrap; import io.netty.handler.codec.http2.HttpConversionUtil; @@ -480,8 +484,7 @@ private ChannelHandlerContext findConnectionHandler() { public boolean isServerPushSupported() { ChannelHandlerContext http2ConnectionHandlerContext = findConnectionHandler(); return http2ConnectionHandlerContext != null && - ((Http2ConnectionHandler) http2ConnectionHandlerContext.handler()).connection().remote().allowPushTo() && - channelHandlerContext.channel() instanceof Http2StreamChannel; + ((Http2ConnectionHandler) http2ConnectionHandlerContext.handler()).connection().remote().allowPushTo(); } @Override @@ -494,14 +497,10 @@ public PushCapableHttpRequest serverPush(@NonNull HttpRequest request) { throw new UnsupportedOperationException("Server push not supported by this client: Client is HTTP2 but does not report support for this feature"); } - if (!(channelHandlerContext.channel() instanceof Http2StreamChannel streamChannel)) { - throw new UnsupportedOperationException("Server push currently not supported by new HTTP2 server handler. Enable the micronaut.server.netty.legacy-multiplex-handlers property"); - } - URI configuredUri = request.getUri(); String scheme = configuredUri.getScheme(); if (scheme == null) { - scheme = channelHandlerContext.channel().parent().pipeline().get(SslHandler.class) == null ? SCHEME_HTTP : SCHEME_HTTPS; + scheme = connectionHandlerContext.channel().pipeline().get(SslHandler.class) == null ? SCHEME_HTTP : SCHEME_HTTPS; } String authority = configuredUri.getAuthority(); if (authority == null) { @@ -523,7 +522,15 @@ public PushCapableHttpRequest serverPush(@NonNull HttpRequest request) { } // request used to trigger our handlers - io.netty.handler.codec.http.HttpRequest inboundRequest = NettyHttpRequestBuilder.asBuilder(request).toHttpRequestWithoutBody(); + io.netty.handler.codec.http.HttpRequest inboundRequestNoBody = NettyHttpRequestBuilder.asBuilder(request).toHttpRequestWithoutBody(); + FullHttpRequest inboundRequest = new DefaultFullHttpRequest( + inboundRequestNoBody.protocolVersion(), + inboundRequestNoBody.method(), + inboundRequestNoBody.uri(), + Unpooled.EMPTY_BUFFER, + inboundRequestNoBody.headers(), + EmptyHttpHeaders.INSTANCE + ); // copy headers from our request for (Iterator> itr = headers.getNettyHeaders().iteratorCharSequence(); itr.hasNext(); ) { @@ -543,52 +550,93 @@ public PushCapableHttpRequest serverPush(@NonNull HttpRequest request) { fixedUri.toString(), inboundRequest.headers() ); - - int ourStream = streamChannel.stream().id(); - HttpPipelineBuilder.StreamPipeline originalStreamPipeline = channelHandlerContext.channel().attr(HttpPipelineBuilder.STREAM_PIPELINE_ATTRIBUTE.get()).get(); - - new Http2StreamChannelBootstrap(channelHandlerContext.channel().parent()) - .handler(new ChannelInitializer() { - @Override - protected void initChannel(@NonNull Http2StreamChannel ch) throws Exception { - int newStream = ch.stream().id(); - - channelHandlerContext.write(new DefaultHttp2PushPromiseFrame(HttpConversionUtil.toHttp2Headers(outboundRequest, false)) - .stream(((Http2StreamChannel) channelHandlerContext.channel()).stream()) - .pushStream(ch.stream())); - - originalStreamPipeline.initializeChildPipelineForPushPromise(ch); - - inboundRequest.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), newStream); - inboundRequest.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_PROMISE_ID.text(), ourStream); - - // delay until our handling is complete - connectionHandlerContext.executor().execute(() -> { - try { - ch.pipeline().context(ChannelPipelineCustomizer.HANDLER_HTTP_DECODER).fireChannelRead(inboundRequest); - } catch (Exception e) { - LOG.warn("Failed to complete push promise", e); + Http2Headers outboundHeaders = HttpConversionUtil.toHttp2Headers(outboundRequest, false); + + if (channelHandlerContext.channel() instanceof Http2StreamChannel streamChannel) { + int ourStream = streamChannel.stream().id(); + HttpPipelineBuilder.StreamPipeline originalStreamPipeline = channelHandlerContext.channel().attr(HttpPipelineBuilder.STREAM_PIPELINE_ATTRIBUTE.get()).get(); + + new Http2StreamChannelBootstrap(channelHandlerContext.channel().parent()) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(@NonNull Http2StreamChannel ch) throws Exception { + int newStream = ch.stream().id(); + + channelHandlerContext.write(new DefaultHttp2PushPromiseFrame(outboundHeaders) + .stream(((Http2StreamChannel) channelHandlerContext.channel()).stream()) + .pushStream(ch.stream())); + + originalStreamPipeline.initializeChildPipelineForPushPromise(ch); + + inboundRequest.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), newStream); + inboundRequest.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_PROMISE_ID.text(), ourStream); + + // delay until our handling is complete + connectionHandlerContext.executor().execute(() -> { + try { + ch.pipeline().context(ChannelPipelineCustomizer.HANDLER_HTTP_DECODER).fireChannelRead(inboundRequest); + } catch (Exception e) { + LOG.warn("Failed to complete push promise", e); + } + }); + } + }) + .open() + .addListener((GenericFutureListener>) future -> { + try { + future.sync(); + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); } - }); - } - }) - .open() - .addListener((GenericFutureListener>) future -> { - try { - future.sync(); - } catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); + LOG.warn("Failed to complete push promise", e); } - LOG.warn("Failed to complete push promise", e); - } - }); + }); + } else { + int ourStreamId = headers.getNettyHeaders().getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text()); + if (channelHandlerContext.executor().inEventLoop()) { + serverPush0(connectionHandler, ourStreamId, inboundRequest, connectionHandlerContext, outboundHeaders); + } else { + channelHandlerContext.executor().execute(() -> serverPush0(connectionHandler, ourStreamId, inboundRequest, connectionHandlerContext, outboundHeaders)); + } + } return this; } else { throw new UnsupportedOperationException("Server push not supported by this client: Not a HTTP2 client"); } } + private static void serverPush0(Http2ConnectionHandler connectionHandler, int ourStreamId, FullHttpRequest inboundRequest, ChannelHandlerContext connectionHandlerContext, Http2Headers outboundHeaders) { + try { + Http2Stream ourStream = connectionHandler.connection().stream(ourStreamId); + if (ourStream == null) { + // this is a bit ugly. Ideally serverPush would return a Publisher that completes when the promise is sent + throw new IllegalStateException("Push promise origin stream is already gone. " + + "This can happen if serverPush() is called outside the event loop, and the primary response is sent before the push promise. " + + "Please either call serverPush() on the event loop, or delay the primary response."); + } + int newStreamId = connectionHandler.connection().local().incrementAndGetNextStreamId(); + + inboundRequest.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), newStreamId); + inboundRequest.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_PROMISE_ID.text(), ourStreamId); + + connectionHandler.encoder().writePushPromise(connectionHandlerContext, ourStreamId, newStreamId, outboundHeaders, 0, connectionHandlerContext.newPromise()) + .addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + try { + ((Http2ServerHandler.ConnectionHandler) connectionHandler).handleFakeRequest(connectionHandler.connection().stream(newStreamId), inboundRequest); + } catch (Exception e) { + LOG.warn("Failed to send push promise", e); + } + } else { + LOG.warn("Failed to send push promise", future.cause()); + } + }); + } catch (Exception e) { + LOG.warn("Failed to send push promise", e); + } + } + @Override protected Charset initCharset(Charset characterEncoding) { return characterEncoding == null ? serverConfiguration.getDefaultCharset() : characterEncoding; diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/Http2ServerHandler.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/Http2ServerHandler.java index e9105cd0086..7a43df4a6fc 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/Http2ServerHandler.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/Http2ServerHandler.java @@ -186,7 +186,7 @@ public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int stream /** * {@link Http2ConnectionHandler} implementation containing the {@link Http2ServerHandler}. */ - static final class ConnectionHandler extends Http2ConnectionHandler { + public static final class ConnectionHandler extends Http2ConnectionHandler { private final Http2ServerHandler handler; private ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings, boolean decoupleCloseAndGoAway, boolean flushPreface, Http2ServerHandler handler) { @@ -236,16 +236,28 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc if (evt instanceof HttpServerUpgradeHandler.UpgradeEvent upgrade) { FullHttpRequest fhr = upgrade.upgradeRequest(); io.netty.handler.codec.http2.Http2Stream cs = connection().stream(1); - Http2ServerHandler.Http2Stream stream = handler.new Http2Stream(cs); - cs.setProperty(handler.streamKey, stream); - boolean empty = !fhr.content().isReadable(); - stream.onHeadersRead(fhr, empty); - if (!empty) { - stream.onDataRead(fhr.content(), true); - } + handleFakeRequest(cs, fhr); } super.userEventTriggered(ctx, evt); } + + /** + * Handle a request on the given stream that did not actually come in as an HTTP/2 request. + * This is used for the h2c upgrade request which is an HTTP/1.1 request that expects an + * HTTP/2 response, and for push promises where the request is initiated by the application. + * + * @param onStream The stream that the response should be sent on + * @param fhr The fake request + */ + public void handleFakeRequest(io.netty.handler.codec.http2.Http2Stream onStream, FullHttpRequest fhr) { + Http2Stream stream = handler.new Http2Stream(onStream); + onStream.setProperty(handler.streamKey, stream); + boolean empty = !fhr.content().isReadable(); + stream.onHeadersRead(fhr, empty); + if (!empty) { + stream.onDataRead(fhr.content(), true); + } + } } /** diff --git a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/http2/Http2ServerPushSpec.groovy b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/http2/Http2ServerPushSpec.groovy index b225b3ffc95..575342e3d3a 100644 --- a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/http2/Http2ServerPushSpec.groovy +++ b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/http2/Http2ServerPushSpec.groovy @@ -56,7 +56,6 @@ import java.util.concurrent.CompletableFuture @Property(name = "micronaut.server.ssl.enabled", value = "true") @Property(name = "micronaut.server.ssl.port", value = "-1") @Property(name = "micronaut.server.ssl.buildSelfSigned", value = "true") -@Property(name = "micronaut.server.netty.legacy-multiplex-handlers", value = "true") @Property(name = "spec.name", value = "Http2ServerPushSpec") class Http2ServerPushSpec extends Specification { @Inject