Skip to content

Commit

Permalink
Triple protocol http1 upgrade support
Browse files Browse the repository at this point in the history
  • Loading branch information
walklown committed Apr 2, 2024
1 parent a12975a commit 8238e49
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.dubbo.remoting.utils.UrlUtils;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.ScopeModelAware;
import org.apache.dubbo.rpc.protocol.tri.h12.HttpServerAfterUpgradeHandler;
import org.apache.dubbo.rpc.protocol.tri.h12.TripleProtocolDetector;
import org.apache.dubbo.rpc.protocol.tri.h12.http1.DefaultHttp11ServerTransportListenerFactory;
import org.apache.dubbo.rpc.protocol.tri.h12.http2.GenericHttp2ServerTransportListenerFactory;
Expand All @@ -48,14 +49,18 @@
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpServerUpgradeHandler;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import io.netty.handler.codec.http2.Http2ServerUpgradeCodec;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.util.AsciiString;

import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_ENABLE_PUSH_KEY;
import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_HEADER_TABLE_SIZE_KEY;
Expand Down Expand Up @@ -143,13 +148,63 @@ public void configServerProtocolHandler(URL url, ChannelOperator operator) {
}

private void configurerHttp1Handlers(URL url, List<ChannelHandler> handlers) {
handlers.add(new ChannelHandlerPretender(new HttpServerCodec()));
final HttpServerCodec sourceCodec = new HttpServerCodec();
handlers.add(new ChannelHandlerPretender(sourceCodec));
// Triple protocol http1 upgrade support
handlers.add(new ChannelHandlerPretender(new HttpServerUpgradeHandler(sourceCodec, protocol -> {
if (!AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) {
// Not upgrade request
return null;
}
return buildHttp2ServerUpgradeCodec(url);
})));
// If the upgrade was successful, remove the message from the output list
// so that it's not propagated to the next handler. This request will
// be propagated as a user event instead.
handlers.add(new ChannelHandlerPretender(new HttpObjectAggregator(Integer.MAX_VALUE)));
handlers.add(new ChannelHandlerPretender(new NettyHttp1Codec()));
handlers.add(new ChannelHandlerPretender(new NettyHttp1ConnectionHandler(
url, frameworkModel, DefaultHttp11ServerTransportListenerFactory.INSTANCE)));
}

private Http2ServerUpgradeCodec buildHttp2ServerUpgradeCodec(URL url) {
Configuration config = ConfigurationUtils.getGlobalConfiguration(url.getOrDefaultApplicationModel());
final Http2FrameCodec codec = TripleHttp2FrameCodecBuilder.forServer()
.customizeConnection((connection) -> connection
.remote()
.flowController(
new TriHttp2RemoteFlowController(connection, url.getOrDefaultApplicationModel())))
.gracefulShutdownTimeoutMillis(10000)
.initialSettings(new Http2Settings()
.headerTableSize(
config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, DEFAULT_SETTING_HEADER_LIST_SIZE))
.maxConcurrentStreams(config.getInt(H2_SETTINGS_MAX_CONCURRENT_STREAMS_KEY, Integer.MAX_VALUE))
.initialWindowSize(config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, DEFAULT_WINDOW_INIT_SIZE))
.maxFrameSize(config.getInt(H2_SETTINGS_MAX_FRAME_SIZE_KEY, DEFAULT_MAX_FRAME_SIZE))
.maxHeaderListSize(
config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, DEFAULT_MAX_HEADER_LIST_SIZE)))
.frameLogger(SERVER_LOGGER)
.build();
final Http2MultiplexHandler handler = new Http2MultiplexHandler(new ChannelInitializer<Http2StreamChannel>() {
@Override
protected void initChannel(Http2StreamChannel ch) {
final ChannelPipeline p = ch.pipeline();
p.addLast(new NettyHttp2FrameCodec());
p.addLast(new NettyHttp2ProtocolSelectorHandler(
url, frameworkModel, GenericHttp2ServerTransportListenerFactory.INSTANCE));
}
});

return new Http2ServerUpgradeCodec(
codec,
new HttpServerAfterUpgradeHandler(),
new HttpWriteQueueHandler(),
new FlushConsolidationHandler(64, true),
new TripleServerConnectionHandler(),
handler,
new TripleTailHandler());
}

private void configurerHttp2Handlers(URL url, List<ChannelHandler> handlers) {
Configuration config = ConfigurationUtils.getGlobalConfiguration(url.getOrDefaultApplicationModel());
final Http2FrameCodec codec = TripleHttp2FrameCodecBuilder.forServer()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.protocol.tri.h12;

import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpServerUpgradeHandler;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2FrameStream;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.InboundHttpToHttp2Adapter;

import static io.netty.handler.codec.http.HttpResponseStatus.OK;

/**
* If an upgrade occurred, the program need send a simple response via HTTP/2 on stream 1 (the stream specifically reserved
* for cleartext HTTP upgrade). However, {@link Http2FrameCodec} send 'upgradeRequest' to upgraded channel handlers by
* {@link InboundHttpToHttp2Adapter} (As it noted that this may behave strangely). So we need to distinguish the 'upgradeRequest'
* and send the response.<br/>
*
* @author <a href="mailto:warlklown@gmail.com">Walklown<a/>
* @see HttpServerUpgradeHandler
* @see Http2FrameCodec
* @see InboundHttpToHttp2Adapter
* @since 3.3.0
*/
@Sharable
public class HttpServerAfterUpgradeHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof DefaultHttp2HeadersFrame) {
DefaultHttp2HeadersFrame headersFrame = (DefaultHttp2HeadersFrame) msg;
if (headersFrame.stream().id() == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID && headersFrame.isEndStream()) {
// upgradeRequest
sendResponse(ctx, headersFrame.stream());
return;
}
}
super.channelRead(ctx, msg);
}

/**
* Send a frame for the response status
*/
private static void sendResponse(ChannelHandlerContext ctx, Http2FrameStream stream) {
Http2Headers headers = new DefaultHttp2Headers().status(OK.codeAsText());
ctx.write(new DefaultHttp2HeadersFrame(headers).stream(stream));
ctx.write(new DefaultHttp2DataFrame(true).stream(stream));
}
}

0 comments on commit 8238e49

Please sign in to comment.