Skip to content

Commit

Permalink
Add support for automatic gzip and deflate decompression
Browse files Browse the repository at this point in the history
  • Loading branch information
nicmunroe committed Jan 30, 2018
1 parent 5966fa2 commit 5cba41d
Show file tree
Hide file tree
Showing 15 changed files with 1,065 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.nike.riposte.server.handler.RoutingHandler;
import com.nike.riposte.server.handler.SecurityValidationHandler;
import com.nike.riposte.server.handler.SmartHttpContentCompressor;
import com.nike.riposte.server.handler.SmartHttpContentDecompressor;
import com.nike.riposte.server.hooks.PipelineCreateHook;
import com.nike.riposte.server.http.Endpoint;
import com.nike.riposte.server.http.RequestInfo;
Expand Down Expand Up @@ -128,6 +129,10 @@ public class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {
* The name of the {@link SmartHttpContentCompressor} handler in the pipeline.
*/
public static final String SMART_HTTP_CONTENT_COMPRESSOR_HANDLER_NAME = "SmartHttpContentCompressorHandler";
/**
* The name of the {@link SmartHttpContentDecompressor} handler in the pipeline.
*/
public static final String SMART_HTTP_CONTENT_DECOMPRESSOR_HANDLER_NAME = "SmartHttpContentDecompressorHandler";
/**
* The name of the {@link RequestInfoSetterHandler} handler in the pipeline.
*/
Expand Down Expand Up @@ -461,7 +466,22 @@ public void initChannel(SocketChannel ch) {
// TODO: Make the threshold configurable
p.addLast(SMART_HTTP_CONTENT_COMPRESSOR_HANDLER_NAME, new SmartHttpContentCompressor(500));

// INBOUND - Add RequestInfoSetterHandler to populate our request state with a RequestInfo object
// INBOUND - Add the "before security" RequestFilterHandler before security and even before routing
// (if we have any filters to apply). This is here before RoutingHandler so that it can intercept requests
// before RoutingHandler throws 404s/405s.
if (beforeSecurityRequestFilterHandler != null)
p.addLast(REQUEST_FILTER_BEFORE_SECURITY_HANDLER_NAME, beforeSecurityRequestFilterHandler);

// INBOUND - Add RoutingHandler to figure out which endpoint should handle the request and set it on our request
// state for later execution
p.addLast(ROUTING_HANDLER_NAME, new RoutingHandler(endpoints, maxRequestSizeInBytes));

// INBOUND - Add SmartHttpContentDecompressor for automatic content decompression if the request indicates it
// is compressed *and* the target endpoint (determined by the previous RoutingHandler) is one that
// is eligible for auto-decompression.
p.addLast(SMART_HTTP_CONTENT_DECOMPRESSOR_HANDLER_NAME, new SmartHttpContentDecompressor());

// INBOUND - Add RequestInfoSetterHandler to populate our RequestInfo's content.
p.addLast(REQUEST_INFO_SETTER_HANDLER_NAME, new RequestInfoSetterHandler(maxRequestSizeInBytes));
// INBOUND - Add OpenChannelLimitHandler to limit the number of open incoming server channels, but only if
// maxOpenChannelsThreshold is not -1.
Expand All @@ -470,14 +490,6 @@ public void initChannel(SocketChannel ch) {
new OpenChannelLimitHandler(openChannelsGroup, maxOpenChannelsThreshold));
}

// INBOUND - Add the RequestFilterHandler for before security (if we have any filters to apply).
if (beforeSecurityRequestFilterHandler != null)
p.addLast(REQUEST_FILTER_BEFORE_SECURITY_HANDLER_NAME, beforeSecurityRequestFilterHandler);

// INBOUND - Add RoutingHandler to figure out which endpoint should handle the request and set it on our request
// state for later execution
p.addLast(ROUTING_HANDLER_NAME, new RoutingHandler(endpoints, maxRequestSizeInBytes));

// INBOUND - Add SecurityValidationHandler to validate the RequestInfo object for the matching endpoint
p.addLast(SECURITY_VALIDATION_HANDLER_NAME, new SecurityValidationHandler(requestSecurityValidator));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class RequestFilterHandler extends BaseInboundHandlerWithTracingAndMdcSup

private final Logger logger = LoggerFactory.getLogger(this.getClass());

protected final RiposteHandlerInternalUtil handlerUtils = RiposteHandlerInternalUtil.DEFAULT_IMPL;
protected final List<RequestAndResponseFilter> filters;

public RequestFilterHandler(List<RequestAndResponseFilter> filters) {
Expand All @@ -52,10 +53,10 @@ protected RequestInfo<?> requestInfoUpdateNoNulls(RequestInfo<?> orig, RequestIn
protected PipelineContinuationBehavior handleFilterLogic(
ChannelHandlerContext ctx,
Object msg,
HttpProcessingState state,
BiFunction<RequestAndResponseFilter, RequestInfo, RequestInfo> normalFilterCall,
BiFunction<RequestAndResponseFilter, RequestInfo, Pair<RequestInfo, Optional<ResponseInfo<?>>>> shortCircuitFilterCall
) {
HttpProcessingState state = ChannelAttributes.getHttpProcessingStateForChannel(ctx).get();
RequestInfo<?> currentReqInfo = state.getRequestInfo();

// Run through each filter.
Expand Down Expand Up @@ -117,25 +118,33 @@ protected PipelineContinuationBehavior handleFilterLogic(
@Override
public PipelineContinuationBehavior doChannelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpRequest) {
HttpProcessingState state = ChannelAttributes.getHttpProcessingStateForChannel(ctx).get();
handlerUtils.createRequestInfoFromNettyHttpRequestAndHandleStateSetupIfNecessary(
(HttpRequest)msg,
state
);

BiFunction<RequestAndResponseFilter, RequestInfo, RequestInfo> normalFilterCall =
(filter, request) -> filter.filterRequestFirstChunkNoPayload(request, ctx);

BiFunction<RequestAndResponseFilter, RequestInfo, Pair<RequestInfo, Optional<ResponseInfo<?>>>>
shortCircuitFilterCall =
(filter, request) -> filter.filterRequestFirstChunkWithOptionalShortCircuitResponse(request, ctx);

return handleFilterLogic(ctx, msg, normalFilterCall, shortCircuitFilterCall);
return handleFilterLogic(ctx, msg, state, normalFilterCall, shortCircuitFilterCall);
}

if (msg instanceof LastHttpContent) {
HttpProcessingState state = ChannelAttributes.getHttpProcessingStateForChannel(ctx).get();

BiFunction<RequestAndResponseFilter, RequestInfo, RequestInfo> normalFilterCall =
(filter, request) -> filter.filterRequestLastChunkWithFullPayload(request, ctx);

BiFunction<RequestAndResponseFilter, RequestInfo, Pair<RequestInfo, Optional<ResponseInfo<?>>>>
shortCircuitFilterCall =
(filter, request) -> filter.filterRequestLastChunkWithOptionalShortCircuitResponse(request, ctx);

return handleFilterLogic(ctx, msg, normalFilterCall, shortCircuitFilterCall);
return handleFilterLogic(ctx, msg, state, normalFilterCall, shortCircuitFilterCall);
}

// Not the first or last chunk. No filters were executed, so continue normally.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.nike.riposte.server.handler;

import com.nike.riposte.server.channelpipeline.ChannelAttributes;
import com.nike.riposte.server.error.exception.InvalidHttpRequestException;
import com.nike.riposte.server.error.exception.RequestTooBigException;
import com.nike.riposte.server.handler.base.BaseInboundHandlerWithTracingAndMdcSupport;
import com.nike.riposte.server.handler.base.PipelineContinuationBehavior;
Expand All @@ -14,7 +13,6 @@

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.util.ReferenceCountUtil;

Expand All @@ -28,8 +26,8 @@
* the incoming request is chunked then the later chunks are added to the stored request info via {@link
* RequestInfo#addContentChunk(HttpContent)}.
* <p/>
* This handler should come after {@link io.netty.handler.codec.http.HttpRequestDecoder} and {@link
* SmartHttpContentCompressor} in the pipeline.
* This handler should come after {@link io.netty.handler.codec.http.HttpRequestDecoder}, {@link
* SmartHttpContentCompressor}, and {@link SmartHttpContentDecompressor} in the pipeline.
*
* The request size is tracked and if it exceeds the configured global or a given endpoint's override, an exception
* will be thrown.
Expand All @@ -40,7 +38,8 @@ public class RequestInfoSetterHandler extends BaseInboundHandlerWithTracingAndMd

private static final Logger logger = LoggerFactory.getLogger(RequestInfoSetterHandler.class);

private final int globalConfiguredMaxRequestSizeInBytes;
protected final RiposteHandlerInternalUtil handlerUtils = RiposteHandlerInternalUtil.DEFAULT_IMPL;
protected final int globalConfiguredMaxRequestSizeInBytes;

public RequestInfoSetterHandler(int globalConfiguredMaxRequestSizeInBytes) {
this.globalConfiguredMaxRequestSizeInBytes = globalConfiguredMaxRequestSizeInBytes;
Expand All @@ -62,14 +61,16 @@ public PipelineContinuationBehavior doChannelRead(ChannelHandlerContext ctx, Obj

// We have a HttpProcessingState. Process the message and continue the pipeline processing.
if (msg instanceof HttpRequest) {
throwExceptionIfNotSuccessfullyDecoded((HttpRequest) msg);
RequestInfo<?> requestInfo = new RequestInfoImpl<>((HttpRequest) msg);
state.setRequestInfo(requestInfo);
// This should be done by RoutingHandler already but it doesn't hurt to double check here, and it
// keeps this handler independent in case things get refactored again in the future.
handlerUtils.createRequestInfoFromNettyHttpRequestAndHandleStateSetupIfNecessary(
(HttpRequest)msg, state
);
}
else if (msg instanceof HttpContent) {
HttpContent httpContentMsg = (HttpContent) msg;

throwExceptionIfNotSuccessfullyDecoded(httpContentMsg);
handlerUtils.throwExceptionIfNotSuccessfullyDecoded(httpContentMsg);
RequestInfo<?> requestInfo = state.getRequestInfo();
if (requestInfo == null) {
throw new IllegalStateException(
Expand Down Expand Up @@ -101,12 +102,6 @@ else if (msg instanceof HttpContent) {
}
}

private void throwExceptionIfNotSuccessfullyDecoded(HttpObject httpObject) {
if (httpObject.getDecoderResult() != null && httpObject.getDecoderResult().isFailure()) {
throw new InvalidHttpRequestException("Detected HttpObject that was not successfully decoded.", httpObject.getDecoderResult().cause());
}
}

@Override
public PipelineContinuationBehavior doExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// If this method is called, there's a chance that the HttpProcessingState does not have a RequestInfo set on it
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.nike.riposte.server.handler;

import com.nike.riposte.server.error.exception.InvalidHttpRequestException;
import com.nike.riposte.server.http.HttpProcessingState;
import com.nike.riposte.server.http.RequestInfo;
import com.nike.riposte.server.http.impl.RequestInfoImpl;

import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;

/**
* Contains utility methods for Riposte handlers. This is intentionally package-private - it is not intended for general
* usage.
*
* @author Nic Munroe
*/
class RiposteHandlerInternalUtil {

static RiposteHandlerInternalUtil DEFAULT_IMPL = new RiposteHandlerInternalUtil();

RequestInfo<?> createRequestInfoFromNettyHttpRequestAndHandleStateSetupIfNecessary(
HttpRequest httpRequest, HttpProcessingState state
) {
// If the HttpProcessingState already has a RequestInfo then we should just use that.
RequestInfo<?> requestInfo = state.getRequestInfo();
if (requestInfo != null) {
return requestInfo;
}

// No RequestInfo has been created yet. Check for an invalid Netty HttpRequest, and assuming it's good then
// generate a new RequestInfo from it and set the RequestInfo on our HttpProcessingState.
throwExceptionIfNotSuccessfullyDecoded(httpRequest);
requestInfo = new RequestInfoImpl<>(httpRequest);
state.setRequestInfo(requestInfo);

return requestInfo;
}

void throwExceptionIfNotSuccessfullyDecoded(HttpObject httpObject) {
if (httpObject.getDecoderResult() != null && httpObject.getDecoderResult().isFailure()) {
throw new InvalidHttpRequestException("Detected HttpObject that was not successfully decoded.",
httpObject.getDecoderResult().cause());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,20 @@
* checking to make sure exactly 1 endpoint matches. See {@link #findSingleEndpointForExecution(RequestInfo)} for more
* information on the error checking.
* <p/>
* This must come after {@link com.nike.riposte.server.handler.RequestInfoSetterHandler} in the pipeline.
* This must come before {@link SmartHttpContentDecompressor} in the pipeline so that it can turn off auto-decompression
* for endpoints that aren't supposed to do auto-decompression (e.g. {@link
* com.nike.riposte.server.http.ProxyRouterEndpoint}s). Consequently it must also come before {@link
* RequestInfoSetterHandler}, which means we have to do the creation of the {@link RequestInfo} from the incoming
* Netty {@link HttpRequest} message and set it on {@link HttpProcessingState} if the state didn't already have a
* {@link RequestInfo}.
*
* @author Nic Munroe
*/
public class RoutingHandler extends BaseInboundHandlerWithTracingAndMdcSupport {

private final Collection<Endpoint<?>> endpoints;
private final int globalConfiguredMaxRequestSizeInBytes;
protected final RiposteHandlerInternalUtil handlerUtils = RiposteHandlerInternalUtil.DEFAULT_IMPL;
protected final Collection<Endpoint<?>> endpoints;
protected final int globalConfiguredMaxRequestSizeInBytes;

public RoutingHandler(Collection<Endpoint<?>> endpoints, int globalMaxRequestSizeInBytes) {
if (endpoints == null || endpoints.isEmpty())
Expand Down Expand Up @@ -108,14 +114,18 @@ protected Pair<Endpoint<?>, String> findSingleEndpointForExecution(RequestInfo r
public PipelineContinuationBehavior doChannelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
HttpProcessingState state = ChannelAttributes.getHttpProcessingStateForChannel(ctx).get();
RequestInfo request = state.getRequestInfo();
RequestInfo request = handlerUtils.createRequestInfoFromNettyHttpRequestAndHandleStateSetupIfNecessary(
(HttpRequest)msg,
state
);
Pair<Endpoint<?>, String> endpointForExecution = findSingleEndpointForExecution(request);

throwExceptionIfContentLengthHeaderIsLargerThanConfiguredMaxRequestSize((HttpRequest) msg, endpointForExecution.getLeft());

request.setPathParamsBasedOnPathTemplate(endpointForExecution.getRight());

state.setEndpointForExecution(endpointForExecution.getLeft(), endpointForExecution.getRight());

throwExceptionIfContentLengthHeaderIsLargerThanConfiguredMaxRequestSize(
(HttpRequest) msg, endpointForExecution.getLeft()
);
}

return PipelineContinuationBehavior.CONTINUE;
Expand All @@ -127,7 +137,9 @@ private void throwExceptionIfContentLengthHeaderIsLargerThanConfiguredMaxRequest
if (!isMaxRequestSizeValidationDisabled(configuredMaxRequestSize)
&& HttpHeaders.isContentLengthSet(msg)
&& HttpHeaders.getContentLength(msg) > configuredMaxRequestSize) {
throw new RequestTooBigException("Content-Length header value exceeded configured max request size of " + configuredMaxRequestSize);
throw new RequestTooBigException(
"Content-Length header value exceeded configured max request size of " + configuredMaxRequestSize
);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.nike.riposte.server.handler;

import com.nike.riposte.server.channelpipeline.ChannelAttributes;
import com.nike.riposte.server.http.Endpoint;
import com.nike.riposte.server.http.HttpProcessingState;
import com.nike.riposte.server.http.ProxyRouterEndpoint;
import com.nike.riposte.server.http.RequestInfo;

import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.HttpContentDecompressor;

/**
* Extension of {@link HttpContentDecompressor} that is smart about whether it decompresses responses or not. It
* inspects the {@link HttpProcessingState#getEndpointForExecution()} to see what
* {@link Endpoint#isDecompressRequestPayloadAllowed(RequestInfo)} returns, and only allows automatic decompression if
* that methods returns true.
*
* <p>{@link ProxyRouterEndpoint}s return false by default for {@link
* ProxyRouterEndpoint#isDecompressRequestPayloadAllowed(RequestInfo)}, so they will not auto-decompress unless that
* method is overridden for a given {@link ProxyRouterEndpoint}. Other endpoint types default to true causing them to
* auto-decompress unless that method is overridden to return false.
*
* @author Nic Munroe
*/
public class SmartHttpContentDecompressor extends HttpContentDecompressor {

public SmartHttpContentDecompressor() {
super();
}

public SmartHttpContentDecompressor(boolean strict) {
super(strict);
}

@Override
protected EmbeddedChannel newContentDecoder(String contentEncoding) throws Exception {
// We only allow decompression if the endpoint allows it.
HttpProcessingState state = ChannelAttributes.getHttpProcessingStateForChannel(ctx).get();
Endpoint<?> endpoint = state.getEndpointForExecution();

if (endpointAllowsDecompression(endpoint, state)) {
return super.newContentDecoder(contentEncoding);
}

// The endpoint does not allow decompression. Return null to indicate that this handler should not
// auto-decompress this request's payload.
return null;
}

protected boolean endpointAllowsDecompression(Endpoint<?> endpoint, HttpProcessingState state) {
if (endpoint == null)
return true;

return endpoint.isDecompressRequestPayloadAllowed(state.getRequestInfo());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -247,4 +247,14 @@ public DownstreamRequestFirstChunkInfo withRelaxedHttpsValidation(boolean relaxe
public Integer maxRequestSizeInBytesOverride() {
return 0;
}

/**
* @return This returns false by default for proxy router endpoints so that the payload will reach the downstream
* target unchanged - override this method if you want the proxy router endpoint to automatically decompress the
* payload as it passes through.
*/
@Override
public boolean isDecompressRequestPayloadAllowed(RequestInfo request) {
return false;
}
}
Loading

0 comments on commit 5cba41d

Please sign in to comment.