Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed compression support for h2c protocol #4944

Merged
merged 2 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Fix a bug on handling an invalid array value for point type field #4900([#4900](https://github.com/opensearch-project/OpenSearch/pull/4900))
- [BUG]: Allow decommission to support delay timeout ([#4930](https://github.com/opensearch-project/OpenSearch/pull/4930))
- Fix failing test: VerifyVersionConstantsIT ([#4946](https://github.com/opensearch-project/OpenSearch/pull/4946))
- Fixed compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944))

### Security
- CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,22 @@

package org.opensearch.client;

import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.classic.methods.HttpPut;
import org.apache.hc.client5.http.entity.GzipCompressingEntity;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import static org.hamcrest.Matchers.equalTo;

Expand All @@ -62,4 +71,32 @@ public void testCompressesResponseIfRequested() throws IOException {
assertEquals(SAMPLE_DOCUMENT, searchResponse.getHits().getHits()[0].getSourceAsString());
}

/**
* The default CloseableHttpAsyncClient does not support compression out of the box (so that applies to RestClient
* and RestHighLevelClient). To check the compression works on both sides, crafting the request using CloseableHttpClient
* instead which uses compression by default.
*/
public void testCompressesRequest() throws IOException, URISyntaxException {
try (CloseableHttpClient client = HttpClients.custom().build()) {
final Node node = client().getNodes().iterator().next();
final URI baseUri = new URI(node.getHost().toURI());

final HttpPut index = new HttpPut(baseUri.resolve("/company/_doc/1"));
index.setEntity(new GzipCompressingEntity(new StringEntity(SAMPLE_DOCUMENT, ContentType.APPLICATION_JSON)));
try (CloseableHttpResponse response = client.execute(index)) {
assertThat(response.getCode(), equalTo(201));
}

final HttpGet refresh = new HttpGet(baseUri.resolve("/_refresh"));
try (CloseableHttpResponse response = client.execute(refresh)) {
assertThat(response.getCode(), equalTo(200));
}

final HttpPost search = new HttpPost(baseUri.resolve("/_search"));
index.setEntity(new GzipCompressingEntity(new StringEntity("{}", ContentType.APPLICATION_JSON)));
try (CloseableHttpResponse response = client.execute(search)) {
assertThat(response.getCode(), equalTo(200));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -413,18 +413,19 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpMessage msg) throws E
// If this handler is hit then no upgrade has been attempted and the client is just talking HTTP
final ChannelPipeline pipeline = ctx.pipeline();
pipeline.addAfter(ctx.name(), "handler", getRequestHandler());
pipeline.replace(this, "aggregator", aggregator);
pipeline.replace(this, "decoder_compress", new HttpContentDecompressor());
Copy link
Member

@andrross andrross Oct 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked through this deeply yet so apologies in advance if this comment is not helpful...but I find this all this "replace", "addAfter", "addBefore", etc makes it very hard to reason about the state of the pipeline when this is done. I'm wondering if it is possible to simplify this.

One thing that jumps out is that we're adding handlers before "handler" (which is added above). Could that be re-written as the following where it's more of a simple chaining of handlers?

final ChannelPipeline pipeline = ctx.pipeline();
pipeline.addAfter(ctx.name(), "request_creator", ...
pipeline.addAfter("request_creator", "response_creator", ...
pipeline.addAfter("response_creator", "pipelining", ...
pipeline.addAfter("pipelining", "handler", ...

Also sometimes we're using the pipeline local variable and sometimes we're using ch.pipeline(). I'm assuming those refer to the same instance?

In any case, making the Netty pipeline setup as simple as possible can really help with long term maintainability.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even better, if we know that ctx.name() is the last handler in the pipeline (this handler was just added via an addLast call so I think it should be), then you can just add the new handlers via addLast calls, which I find is the simplest way to construct a pipeline.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andrross

Also sometimes we're using the pipeline local variable and sometimes we're using ch.pipeline(). I'm assuming those refer to the same instance?

Yes, thanks for noticing, I will change that to use pipeline in this block

One thing that jumps out is that we're adding handlers before "handler" (which is added above). Could that be re-written as the following where it's more of a simple chaining of handlers?

The order of handler is super important, and in this particular block - we really need to replace the handler and restart the processing again. The whole kind of purpose of that is to process the upgrade request first and than run the original request as it have been processed the normal way.

Even better, if we know that ctx.name() is the last handler in the pipeline (this handler was just added via an addLast call so I think it should be), then you can just add the new handlers via addLast calls, which I find is the simplest way to construct a pipeline.

The ctx.name() is the current handler which we are replacing with decoder_compress, there are a few other handlers added before and/or after (by means of HttpServerUpgradeHandler and CleartextHttp2ServerUpgradeHandler).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order of handler is super important

Understood. I'm trying to see if there's a more straightforward way to achieve the desired order so that the actual order is more apparent/easier to understand when reading the code. It looks to me light we add "handler" to the pipeline, then 10 lines later start adding handlers before it. Seems like that could all happen together, and potentially invert the order of adding them so that you don't have to mix "addAfter" and "addBefore".

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in general it is possible to rewrite the pipeline configuration a bit, it took me a while to understand why some handlers are either not invoked or invoked when they shouldn't, luckily we have test for it now :-) (not like affraid to touch it but it took me really a lot of time today to troubleshoot) :-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough...I did start this comment with "apologies in advance" :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough...I did start this comment with "apologies in advance" :)

Thanks a lot for review


ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
pipeline.addAfter("decoder_compress", "aggregator", aggregator);
if (handlingSettings.isCompression()) {
ch.pipeline()
.addAfter("aggregator", "encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
pipeline.addAfter(
"aggregator",
"encoder_compress",
new HttpContentCompressor(handlingSettings.getCompressionLevel())
);
}
ch.pipeline().addBefore("handler", "request_creator", requestCreator);
ch.pipeline().addBefore("handler", "response_creator", responseCreator);
ch.pipeline()
.addBefore("handler", "pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
pipeline.addBefore("handler", "request_creator", requestCreator);
pipeline.addBefore("handler", "response_creator", responseCreator);
pipeline.addBefore("handler", "pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));

ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ grant codeBase "${codebase.httpcore5}" {
permission java.net.SocketPermission "*", "connect";
};

grant codeBase "${codebase.httpclient5}" {
// httpclient5 makes socket connections for rest tests
permission java.net.SocketPermission "*", "connect";
};

grant codeBase "${codebase.httpcore-nio}" {
// httpcore makes socket connections for rest tests
permission java.net.SocketPermission "*", "connect";
Expand Down