Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.11] Allow customization of netty channel handles before and during decompression #10443

Merged
merged 1 commit into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add unreferenced file cleanup count to merge stats ([#10204](https://github.com/opensearch-project/OpenSearch/pull/10204))
- Configurable merge policy for index with an option to choose from LogByteSize and Tiered merge policy ([#9992](https://github.com/opensearch-project/OpenSearch/pull/9992))
- [Remote Store] Add support to restrict creation & deletion if system repository and mutation of immutable settings of system repository ([#9839](https://github.com/opensearch-project/OpenSearch/pull/9839))
- Improve compressed request handling ([#10261](https://github.com/opensearch-project/OpenSearch/pull/10261))

### Dependencies
- Bump JNA version from 5.5 to 5.13 ([#9963](https://github.com/opensearch-project/OpenSearch/pull/9963))
- Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.netty4;

import org.opensearch.OpenSearchNetty4IntegTestCase;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
import org.opensearch.transport.Netty4BlockingPlugin;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import io.netty.buffer.ByteBufUtil;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.ReferenceCounted;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;

@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
public class Netty4HeaderVerifierIT extends OpenSearchNetty4IntegTestCase {

@Override
protected boolean addMockHttpTransport() {
return false; // enable http
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(Netty4BlockingPlugin.class);
}

public void testThatNettyHttpServerRequestBlockedWithHeaderVerifier() throws Exception {
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
TransportAddress transportAddress = randomFrom(boundAddresses);

final FullHttpRequest blockedRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
blockedRequest.headers().add("blockme", "Not Allowed");
blockedRequest.headers().add(HOST, "localhost");

final List<FullHttpResponse> responses = new ArrayList<>();
try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) {
try {
FullHttpResponse blockedResponse = nettyHttpClient.send(transportAddress.address(), blockedRequest);
responses.add(blockedResponse);
String blockedResponseContent = new String(ByteBufUtil.getBytes(blockedResponse.content()), StandardCharsets.UTF_8);
assertThat(blockedResponseContent, containsString("Hit header_verifier"));
assertThat(blockedResponse.status().code(), equalTo(401));
} finally {
responses.forEach(ReferenceCounted::release);
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport;

import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.http.netty4.Netty4HttpServerTransport;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestRequest;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;

import java.util.Collections;
import java.util.Map;
import java.util.function.Supplier;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;

public class Netty4BlockingPlugin extends Netty4Plugin {

private static final AttributeKey<Boolean> SHOULD_BLOCK = AttributeKey.newInstance("should-block");

public class Netty4BlockingHttpServerTransport extends Netty4HttpServerTransport {

public Netty4BlockingHttpServerTransport(
Settings settings,
NetworkService networkService,
BigArrays bigArrays,
ThreadPool threadPool,
NamedXContentRegistry xContentRegistry,
Dispatcher dispatcher,
ClusterSettings clusterSettings,
SharedGroupFactory sharedGroupFactory,
Tracer tracer
) {
super(
settings,
networkService,
bigArrays,
threadPool,
xContentRegistry,
dispatcher,
clusterSettings,
sharedGroupFactory,
tracer
);
}

@Override
protected ChannelInboundHandlerAdapter createHeaderVerifier() {
return new ExampleBlockingNetty4HeaderVerifier();
}
}

@Override
public Map<String, Supplier<HttpServerTransport>> getHttpTransports(
Settings settings,
ThreadPool threadPool,
BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedXContentRegistry xContentRegistry,
NetworkService networkService,
HttpServerTransport.Dispatcher dispatcher,
ClusterSettings clusterSettings,
Tracer tracer
) {
return Collections.singletonMap(
NETTY_HTTP_TRANSPORT_NAME,
() -> new Netty4BlockingHttpServerTransport(
settings,
networkService,
bigArrays,
threadPool,
xContentRegistry,
new BlockingDispatcher(dispatcher),
clusterSettings,
getSharedGroupFactory(settings),
tracer
)
);
}

/** POC for how an external header verifier would be implemented */
public class ExampleBlockingNetty4HeaderVerifier extends SimpleChannelInboundHandler<DefaultHttpRequest> {

@Override
public void channelRead0(ChannelHandlerContext ctx, DefaultHttpRequest msg) throws Exception {
ReferenceCountUtil.retain(msg);
if (isBlocked(msg)) {
msg.headers().add("blocked", true);
}
ctx.fireChannelRead(msg);
}

private boolean isBlocked(HttpMessage request) {
final boolean shouldBlock = request.headers().contains("blockme");

return shouldBlock;
}
}

class BlockingDispatcher implements HttpServerTransport.Dispatcher {

private HttpServerTransport.Dispatcher originalDispatcher;

public BlockingDispatcher(final HttpServerTransport.Dispatcher originalDispatcher) {
super();
this.originalDispatcher = originalDispatcher;
}

@Override
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
if (request.getHeaders().containsKey("blocked")) {
channel.sendResponse(new BytesRestResponse(RestStatus.UNAUTHORIZED, "Hit header_verifier"));
return;
}
originalDispatcher.dispatchRequest(request, channel, threadContext);

}

@Override
public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) {
originalDispatcher.dispatchBadRequest(channel, threadContext, cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,10 @@ public ChannelHandler configureServerChannelHandler() {
return new HttpChannelHandler(this, handlingSettings);
}

static final AttributeKey<Netty4HttpChannel> HTTP_CHANNEL_KEY = AttributeKey.newInstance("opensearch-http-channel");
static final AttributeKey<Netty4HttpServerChannel> HTTP_SERVER_CHANNEL_KEY = AttributeKey.newInstance("opensearch-http-server-channel");
public static final AttributeKey<Netty4HttpChannel> HTTP_CHANNEL_KEY = AttributeKey.newInstance("opensearch-http-channel");
protected static final AttributeKey<Netty4HttpServerChannel> HTTP_SERVER_CHANNEL_KEY = AttributeKey.newInstance(
"opensearch-http-server-channel"
);

protected static class HttpChannelHandler extends ChannelInitializer<Channel> {

Expand Down Expand Up @@ -351,7 +353,8 @@ protected void initChannel(Channel ch) throws Exception {
);
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
ch.pipeline().addLast("decoder", decoder);
ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());
ch.pipeline().addLast("header_verifier", transport.createHeaderVerifier());
ch.pipeline().addLast("decoder_compress", transport.createDecompressor());
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
Expand Down Expand Up @@ -393,4 +396,21 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
}
}
}

/**
* Extension point that allows a NetworkPlugin to extend the netty pipeline and inspect headers after request decoding
*/
protected ChannelInboundHandlerAdapter createHeaderVerifier() {
// pass-through
return new ChannelInboundHandlerAdapter();
}

/**
* Extension point that allows a NetworkPlugin to override the default netty HttpContentDecompressor and supply a custom decompressor.
*
* Used in instances to conditionally decompress depending on the outcome from header verification
*/
protected ChannelInboundHandlerAdapter createDecompressor() {
return new HttpContentDecompressor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(
);
}

private SharedGroupFactory getSharedGroupFactory(Settings settings) {
SharedGroupFactory getSharedGroupFactory(Settings settings) {
SharedGroupFactory groupFactory = this.groupFactory.get();
if (groupFactory != null) {
assert groupFactory.getSettings().equals(settings) : "Different settings than originally provided";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ default List<ReplacedRoute> replacedRoutes() {
}

/**
* Controls whether requests handled by this class are allowed to to access system indices by default.
* Controls whether requests handled by this class are allowed to access system indices by default.
* @return {@code true} if requests handled by this class should be allowed to access system indices.
*/
default boolean allowSystemIndexAccessByDefault() {
Expand Down
Loading