Skip to content

Commit

Permalink
Fix HTTP/2 Upgrade flow for requests with content (post/put/...) (#7105)
Browse files Browse the repository at this point in the history
* Fix HTTP/2 Upgrade flow for requests with content (post/put/...)

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

* Address code review comments

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

* Address code review comments

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

* Address code review comments

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

---------

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta authored Apr 11, 2023
1 parent fd6a9e6 commit a15475a
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,21 @@
package org.opensearch.http.netty4;

import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.ReferenceCounted;
import org.opensearch.OpenSearchNetty4IntegTestCase;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;

import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.stream.IntStream;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasSize;

Expand All @@ -31,7 +35,7 @@ protected boolean addMockHttpTransport() {
return false; // enable http
}

public void testThatNettyHttpServerSupportsHttp2() throws Exception {
public void testThatNettyHttpServerSupportsHttp2GetUpgrades() throws Exception {
String[] requests = new String[] { "/", "/_nodes/stats", "/", "/_cluster/state", "/" };

HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
Expand All @@ -44,19 +48,42 @@ public void testThatNettyHttpServerSupportsHttp2() throws Exception {
assertThat(responses, hasSize(5));

Collection<String> opaqueIds = Netty4HttpClient.returnOpaqueIds(responses);
assertOpaqueIdsInAnyOrder(opaqueIds);
assertOpaqueIdsInAnyOrder(5, opaqueIds);
} finally {
responses.forEach(ReferenceCounted::release);
}
}
}

private void assertOpaqueIdsInAnyOrder(Collection<String> opaqueIds) {
public void testThatNettyHttpServerSupportsHttp2PostUpgrades() throws Exception {
final List<Tuple<String, CharSequence>> requests = List.of(Tuple.tuple("/_search", "{\"query\":{ \"match_all\":{}}}"));

HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
TransportAddress transportAddress = randomFrom(boundAddresses);

try (Netty4HttpClient nettyHttpClient = Netty4HttpClient.http2()) {
Collection<FullHttpResponse> responses = nettyHttpClient.post(transportAddress.address(), requests);
try {
assertThat(responses, hasSize(1));

for (FullHttpResponse response : responses) {
assertThat(response.getStatus(), equalTo(HttpResponseStatus.OK));
}

Collection<String> opaqueIds = Netty4HttpClient.returnOpaqueIds(responses);
String msg = String.format(Locale.ROOT, "Expected opaque id [0], got [%s]", opaqueIds);
assertOpaqueIdsInAnyOrder(1, opaqueIds);
} finally {
responses.forEach(ReferenceCounted::release);
}
}
}

private void assertOpaqueIdsInAnyOrder(int expected, Collection<String> opaqueIds) {
// check if opaque ids are present in any order, since for HTTP/2 we use streaming (no head of line blocking)
// and responses may come back at any order
int i = 0;
String msg = String.format(Locale.ROOT, "Expected list of opaque ids to be in any order, got [%s]", opaqueIds);
assertThat(msg, opaqueIds, containsInAnyOrder(IntStream.range(0, 5).mapToObj(Integer::toString).toArray()));
assertThat(opaqueIds, containsInAnyOrder(IntStream.range(0, expected).mapToObj(Integer::toString).toArray()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,11 @@ public UpgradeCodec newUpgradeCodec(CharSequence protocol) {
handlingSettings.getMaxChunkSize()
);

final HttpServerUpgradeHandler upgradeHandler = new HttpServerUpgradeHandler(sourceCodec, upgradeCodecFactory);
final HttpServerUpgradeHandler upgradeHandler = new HttpServerUpgradeHandler(
sourceCodec,
upgradeCodecFactory,
handlingSettings.getMaxContentLength()
);
final CleartextHttp2ServerUpgradeHandler cleartextUpgradeHandler = new CleartextHttp2ServerUpgradeHandler(
sourceCodec,
upgradeHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,15 @@ private List<FullHttpResponse> processRequestsWithBody(
List<Tuple<String, CharSequence>> urisAndBodies
) throws InterruptedException {
List<HttpRequest> requests = new ArrayList<>(urisAndBodies.size());
for (Tuple<String, CharSequence> uriAndBody : urisAndBodies) {
for (int i = 0; i < urisAndBodies.size(); ++i) {
final Tuple<String, CharSequence> uriAndBody = urisAndBodies.get(i);
ByteBuf content = Unpooled.copiedBuffer(uriAndBody.v2(), StandardCharsets.UTF_8);
HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, uriAndBody.v1(), content);
request.headers().add(HttpHeaderNames.HOST, "localhost");
request.headers().add(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
request.headers().add(HttpHeaderNames.CONTENT_TYPE, "application/json");
request.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), "http");
request.headers().add("X-Opaque-ID", String.valueOf(i));
requests.add(request);
}
return sendRequests(remoteAddress, requests);
Expand Down Expand Up @@ -211,7 +213,7 @@ private synchronized List<FullHttpResponse> sendRequests(final SocketAddress rem

} finally {
if (channelFuture != null) {
channelFuture.channel().close().sync();
channelFuture.channel().close().awaitUninterruptibly();
}
}

Expand Down Expand Up @@ -368,7 +370,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
request.headers().add(HttpHeaderNames.HOST, "localhost");
request.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), "http");

ctx.channel().attr(AttributeKey.newInstance("upgrade")).set(true);
ctx.channel().attr(AttributeKey.valueOf("upgrade")).set(true);
ctx.writeAndFlush(request);
ctx.fireChannelActive();

Expand Down

0 comments on commit a15475a

Please sign in to comment.