Skip to content

Commit

Permalink
Add configurable response compression threshold to ServerConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
nicmunroe committed Jan 30, 2018
1 parent 5cba41d commit 6331d0d
Show file tree
Hide file tree
Showing 8 changed files with 380 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void startup() throws CertificateException, IOException, InterruptedExcep
serverConfig.requestSecurityValidator(), serverConfig.workerChannelIdleTimeoutMillis(),
serverConfig.proxyRouterConnectTimeoutMillis(), serverConfig.incompleteHttpCallTimeoutMillis(),
serverConfig.maxOpenIncomingServerChannels(), serverConfig.isDebugChannelLifecycleLoggingEnabled(),
serverConfig.userIdHeaderKeys()
serverConfig.userIdHeaderKeys(), serverConfig.responseCompressionThresholdBytes()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ public class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {
private final int maxOpenChannelsThreshold;
private final ChannelGroup openChannelsGroup;
private final boolean debugChannelLifecycleLoggingEnabled;
private final int responseCompressionThresholdBytes;

private final StreamingAsyncHttpClient streamingAsyncHttpClientForProxyRouterEndpoints;

Expand Down Expand Up @@ -338,7 +339,8 @@ public HttpChannelInitializer(SslContext sslCtx,
long incompleteHttpCallTimeoutMillis,
int maxOpenChannelsThreshold,
boolean debugChannelLifecycleLoggingEnabled,
List<String> userIdHeaderKeys) {
List<String> userIdHeaderKeys,
int responseCompressionThresholdBytes) {
if (endpoints == null || endpoints.isEmpty())
throw new IllegalArgumentException("endpoints cannot be empty");

Expand Down Expand Up @@ -418,6 +420,7 @@ public HttpChannelInitializer(SslContext sslCtx,

cachedResponseFilterHandler = (hasReqResFilters) ? new ResponseFilterHandler(requestAndResponseFilters) : null;
this.userIdHeaderKeys = userIdHeaderKeys;
this.responseCompressionThresholdBytes = responseCompressionThresholdBytes;
}

@Override
Expand Down Expand Up @@ -463,8 +466,8 @@ public void initChannel(SocketChannel ch) {
// request/response/size threshold). This must be after HttpRequestDecoder on the incoming pipeline and
// before HttpResponseEncoder on the outbound pipeline (keep in mind that "before" on outbound means
// later in the list since outbound is processed in reverse order).
// TODO: Make the threshold configurable
p.addLast(SMART_HTTP_CONTENT_COMPRESSOR_HANDLER_NAME, new SmartHttpContentCompressor(500));
p.addLast(SMART_HTTP_CONTENT_COMPRESSOR_HANDLER_NAME,
new SmartHttpContentCompressor(responseCompressionThresholdBytes));

// INBOUND - Add the "before security" RequestFilterHandler before security and even before routing
// (if we have any filters to apply). This is here before RoutingHandler so that it can intercept requests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,15 @@ public void constructor_works_with_valid_args() {
int maxOpenChannelsThreshold = 1000;
boolean debugChannelLifecycleLoggingEnabled = true;
List<String> userIdHeaderKeys = mock(List.class);
int responseCompressionThresholdBytes = 5678;

// when
HttpChannelInitializer hci = new HttpChannelInitializer(
sslCtx, maxRequestSizeInBytes, endpoints, reqResFilters, longRunningTaskExecutor, riposteErrorHandler, riposteUnhandledErrorHandler,
validationService, requestContentDeserializer, responseSender, metricsListener, defaultCompletableFutureTimeoutMillis, accessLogger,
pipelineCreateHooks, requestSecurityValidator, workerChannelIdleTimeoutMillis, proxyRouterConnectTimeoutMillis,
incompleteHttpCallTimeoutMillis, maxOpenChannelsThreshold, debugChannelLifecycleLoggingEnabled, userIdHeaderKeys);
incompleteHttpCallTimeoutMillis, maxOpenChannelsThreshold, debugChannelLifecycleLoggingEnabled, userIdHeaderKeys,
responseCompressionThresholdBytes);

// then
assertThat(extractField(hci, "sslCtx"), is(sslCtx));
Expand All @@ -182,6 +184,7 @@ public void constructor_works_with_valid_args() {
assertThat(extractField(hci, "maxOpenChannelsThreshold"), is(maxOpenChannelsThreshold));
assertThat(extractField(hci, "debugChannelLifecycleLoggingEnabled"), is(debugChannelLifecycleLoggingEnabled));
assertThat(extractField(hci, "userIdHeaderKeys"), is(userIdHeaderKeys));
assertThat(extractField(hci, "responseCompressionThresholdBytes"), is(responseCompressionThresholdBytes));

StreamingAsyncHttpClient sahc = extractField(hci, "streamingAsyncHttpClientForProxyRouterEndpoints");
assertThat(extractField(sahc, "idleChannelTimeoutMillis"), is(workerChannelIdleTimeoutMillis));
Expand All @@ -206,7 +209,8 @@ public void constructor_gracefully_handles_some_null_args() {
HttpChannelInitializer hci = new HttpChannelInitializer(
null, 42, Arrays.asList(getMockEndpoint("/some/path")), null, null, mock(RiposteErrorHandler.class), mock(RiposteUnhandledErrorHandler.class),
null, null, mock(ResponseSender.class), null, 4242L, null,
null, null, 121, 42, 321, 100, false, null);
null, null, 121, 42, 321, 100, false, null,
123);

// then
assertThat(extractField(hci, "sslCtx"), nullValue());
Expand Down Expand Up @@ -236,7 +240,8 @@ public void constructor_handles_empty_after_security_request_handlers() {
HttpChannelInitializer hci = new HttpChannelInitializer(
null, 42, Arrays.asList(getMockEndpoint("/some/path")), reqResFilters, null, mock(RiposteErrorHandler.class), mock(RiposteUnhandledErrorHandler.class),
null, null, mock(ResponseSender.class), null, 4242L, null,
null, null, 121, 42, 321, 100, false, null);
null, null, 121, 42, 321, 100, false, null,
123);

// then
RequestFilterHandler beforeSecReqFH = extractField(hci, "beforeSecurityRequestFilterHandler");
Expand All @@ -259,7 +264,8 @@ public void constructor_handles_empty_before_security_request_handlers() {
HttpChannelInitializer hci = new HttpChannelInitializer(
null, 42, Arrays.asList(getMockEndpoint("/some/path")), reqResFilters, null, mock(RiposteErrorHandler.class), mock(RiposteUnhandledErrorHandler.class),
null, null, mock(ResponseSender.class), null, 4242L, null,
null, null, 121, 42, 321, 100, false, null);
null, null, 121, 42, 321, 100, false, null,
123);

// then
RequestFilterHandler beforeSecReqFH = extractField(hci, "afterSecurityRequestFilterHandler");
Expand All @@ -277,7 +283,8 @@ public void constructor_throws_IllegalArgumentException_if_endpoints_is_null() {
new HttpChannelInitializer(
null, 42, null, null, null, mock(RiposteErrorHandler.class), mock(RiposteUnhandledErrorHandler.class),
null, null, mock(ResponseSender.class), null, 4242L, null,
null, null, 121, 42, 321, 100, false, null);
null, null, 121, 42, 321, 100, false, null,
123);
}

@Test(expected = IllegalArgumentException.class)
Expand All @@ -286,7 +293,8 @@ public void constructor_throws_IllegalArgumentException_if_endpoints_is_empty()
new HttpChannelInitializer(
null, 42, Collections.emptyList(), null, null, mock(RiposteErrorHandler.class), mock(RiposteUnhandledErrorHandler.class),
null, null, mock(ResponseSender.class), null, 4242L, null,
null, null, 121, 42, 321, 100, false, null);
null, null, 121, 42, 321, 100, false, null,
123);
}

@Test(expected = IllegalArgumentException.class)
Expand All @@ -295,7 +303,8 @@ public void constructor_throws_IllegalArgumentException_if_riposteErrorHandler_i
new HttpChannelInitializer(
null, 42, Arrays.asList(getMockEndpoint("/some/path")), null, null, null, mock(RiposteUnhandledErrorHandler.class),
null, null, mock(ResponseSender.class), null, 4242L, null,
null, null, 121, 42, 321, 100, false, null);
null, null, 121, 42, 321, 100, false, null,
123);
}

@Test(expected = IllegalArgumentException.class)
Expand All @@ -304,7 +313,8 @@ public void constructor_throws_IllegalArgumentException_if_riposteUnhandledError
new HttpChannelInitializer(
null, 42, Arrays.asList(getMockEndpoint("/some/path")), null, null, mock(RiposteErrorHandler.class), null,
null, null, mock(ResponseSender.class), null, 4242L, null,
null, null, 121, 42, 321, 100, false, null);
null, null, 121, 42, 321, 100, false, null,
123);
}

@Test(expected = IllegalArgumentException.class)
Expand All @@ -313,7 +323,8 @@ public void constructor_throws_IllegalArgumentException_if_responseSender_is_nul
new HttpChannelInitializer(
null, 42, Arrays.asList(getMockEndpoint("/some/path")), null, null, mock(RiposteErrorHandler.class), mock(RiposteUnhandledErrorHandler.class),
null, null, null, null, 4242L, null,
null, null, 121, 42, 321, 100, false, null);
null, null, 121, 42, 321, 100, false, null,
123);
}

private <T extends ChannelHandler> Pair<Integer, T> findChannelHandler(List<ChannelHandler> channelHandlers, Class<T> classToFind, boolean findLast) {
Expand Down Expand Up @@ -347,7 +358,7 @@ private HttpChannelInitializer basicHttpChannelInitializer(SslContext sslCtx, lo
sslCtx, 42, Arrays.asList(getMockEndpoint("/some/path")), requestAndResponseFilters, null, mock(RiposteErrorHandler.class),
mock(RiposteUnhandledErrorHandler.class), validationService, null, mock(ResponseSender.class), null, 4242L, null,
null, null, workerChannelIdleTimeoutMillis, 4200, 1234, maxOpenChannelsThreshold, debugChannelLifecycleLoggingEnabled,
null);
null, 123);
}

@Test
Expand Down Expand Up @@ -585,7 +596,7 @@ public void initChannel_adds_AccessLogStartHandler_immediately_after_DTraceStart
}

@Test
public void initChannel_adds_HttpContentCompressor_before_HttpResponseEncoder_for_outbound_handler() {
public void initChannel_adds_SmartHttpContentCompressor_before_HttpResponseEncoder_for_outbound_handler() {
// given
HttpChannelInitializer hci = basicHttpChannelInitializerNoUtilityHandlers();

Expand All @@ -596,19 +607,24 @@ public void initChannel_adds_HttpContentCompressor_before_HttpResponseEncoder_fo
ArgumentCaptor<ChannelHandler> channelHandlerArgumentCaptor = ArgumentCaptor.forClass(ChannelHandler.class);
verify(channelPipelineMock, atLeastOnce()).addLast(anyString(), channelHandlerArgumentCaptor.capture());
List<ChannelHandler> handlers = channelHandlerArgumentCaptor.getAllValues();
Pair<Integer, HttpContentCompressor> httpContentCompressor = findChannelHandler(handlers, HttpContentCompressor.class);
Pair<Integer, SmartHttpContentCompressor> httpContentCompressor = findChannelHandler(handlers, SmartHttpContentCompressor.class);
Pair<Integer, HttpResponseEncoder> httpResponseEncoder = findChannelHandler(handlers, HttpResponseEncoder.class);

assertThat(httpContentCompressor, notNullValue());
assertThat(httpResponseEncoder, notNullValue());

// HttpContentCompressor's index should be later than HttpResponseEncoder's index to verify that it comes BEFORE HttpResponseEncoder on the OUTBOUND handlers
// (since the outbound handlers are processed in reverse order).
// SmartHttpContentCompressor's index should be later than HttpResponseEncoder's index to verify that it comes
// BEFORE HttpResponseEncoder on the OUTBOUND handlers (since the outbound handlers are processed in
// reverse order).
assertThat(httpContentCompressor.getLeft(), is(greaterThan(httpResponseEncoder.getLeft())));
// Verify that SmartHttpContentCompressor's threshold is set to the specified config value.
long expectedThresholdValue = ((Integer)extractField(hci, "responseCompressionThresholdBytes")).longValue();
assertThat(extractField(httpContentCompressor.getRight(), "responseSizeThresholdBytes"),
is(expectedThresholdValue));
}

@Test
public void initChannel_adds_RequestInfoSetterHandler_after_HttpContentCompressor() {
public void initChannel_adds_RequestInfoSetterHandler_after_SmartHttpContentCompressor() {
// given
HttpChannelInitializer hci = basicHttpChannelInitializerNoUtilityHandlers();

Expand All @@ -619,7 +635,7 @@ public void initChannel_adds_RequestInfoSetterHandler_after_HttpContentCompresso
ArgumentCaptor<ChannelHandler> channelHandlerArgumentCaptor = ArgumentCaptor.forClass(ChannelHandler.class);
verify(channelPipelineMock, atLeastOnce()).addLast(anyString(), channelHandlerArgumentCaptor.capture());
List<ChannelHandler> handlers = channelHandlerArgumentCaptor.getAllValues();
Pair<Integer, HttpContentCompressor> httpContentCompressor = findChannelHandler(handlers, HttpContentCompressor.class);
Pair<Integer, SmartHttpContentCompressor> httpContentCompressor = findChannelHandler(handlers, SmartHttpContentCompressor.class);
Pair<Integer, RequestInfoSetterHandler> requestInfoSetterHandler = findChannelHandler(handlers, RequestInfoSetterHandler.class);

assertThat(httpContentCompressor, notNullValue());
Expand Down
Loading

0 comments on commit 6331d0d

Please sign in to comment.