Skip to content

Commit

Permalink
Reintroduce mandatory http pipelining support (#30820)
Browse files Browse the repository at this point in the history
This commit reintroduces 31251c9 and 63a5799. These commits introduced a
memory leak and were reverted. This commit brings those commits back
and fixes the memory leak by removing unnecessary retain method calls.
  • Loading branch information
Tim-Brooks authored May 23, 2018
1 parent a96a45c commit d7040ad
Show file tree
Hide file tree
Showing 38 changed files with 1,002 additions and 669 deletions.
10 changes: 9 additions & 1 deletion docs/reference/migration/migrate_7_0/settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@
[[remove-http-enabled]]
==== Http enabled setting removed

The setting `http.enabled` previously allowed disabling binding to HTTP, only allowing
* The setting `http.enabled` previously allowed disabling binding to HTTP, only allowing
use of the transport client. This setting has been removed, as the transport client
will be removed in the future, thus requiring HTTP to always be enabled.

[[remove-http-pipelining-setting]]
==== Http pipelining setting removed

* The setting `http.pipelining` previously allowed disabling HTTP pipelining support.
This setting has been removed, as disabling http pipelining support on the server
provided little value. The setting `http.pipelining.max_events` can still be used to
limit the number of pipelined requests in-flight.
2 changes: 0 additions & 2 deletions docs/reference/modules/http.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ and stack traces in response output. Note: When set to `false` and the `error_tr
parameter is specified, an error will be returned; when `error_trace` is not specified, a
simple message will be returned. Defaults to `true`

|`http.pipelining` |Enable or disable HTTP pipelining, defaults to `true`.

|`http.pipelining.max_events` |The maximum number of events to be queued up in memory before a HTTP connection is closed, defaults to `10000`.

|`http.max_warning_header_count` |The maximum number of warning headers in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest;
import org.elasticsearch.rest.AbstractRestChannel;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -59,29 +58,24 @@ final class Netty4HttpChannel extends AbstractRestChannel {
private final Netty4HttpServerTransport transport;
private final Channel channel;
private final FullHttpRequest nettyRequest;
private final HttpPipelinedRequest pipelinedRequest;
private final int sequence;
private final ThreadContext threadContext;
private final HttpHandlingSettings handlingSettings;

/**
* @param transport The corresponding <code>NettyHttpServerTransport</code> where this channel belongs to.
* @param request The request that is handled by this channel.
* @param pipelinedRequest If HTTP pipelining is enabled provide the corresponding pipelined request. May be null if
* HTTP pipelining is disabled.
* @param handlingSettings true iff error messages should include stack traces.
* @param threadContext the thread context for the channel
* @param transport The corresponding <code>NettyHttpServerTransport</code> where this channel belongs to.
* @param request The request that is handled by this channel.
* @param sequence The pipelining sequence number for this request
* @param handlingSettings true if error messages should include stack traces.
* @param threadContext the thread context for the channel
*/
Netty4HttpChannel(
final Netty4HttpServerTransport transport,
final Netty4HttpRequest request,
final HttpPipelinedRequest pipelinedRequest,
final HttpHandlingSettings handlingSettings,
final ThreadContext threadContext) {
Netty4HttpChannel(Netty4HttpServerTransport transport, Netty4HttpRequest request, int sequence, HttpHandlingSettings handlingSettings,
ThreadContext threadContext) {
super(request, handlingSettings.getDetailedErrorsEnabled());
this.transport = transport;
this.channel = request.getChannel();
this.nettyRequest = request.request();
this.pipelinedRequest = pipelinedRequest;
this.sequence = sequence;
this.threadContext = threadContext;
this.handlingSettings = handlingSettings;
}
Expand Down Expand Up @@ -129,7 +123,7 @@ public void sendResponse(RestResponse response) {
final ChannelPromise promise = channel.newPromise();

if (releaseContent) {
promise.addListener(f -> ((Releasable)content).close());
promise.addListener(f -> ((Releasable) content).close());
}

if (releaseBytesStreamOutput) {
Expand All @@ -140,13 +134,9 @@ public void sendResponse(RestResponse response) {
promise.addListener(ChannelFutureListener.CLOSE);
}

final Object msg;
if (pipelinedRequest != null) {
msg = pipelinedRequest.createHttpResponse(resp, promise);
} else {
msg = resp;
}
channel.writeAndFlush(msg, promise);
Netty4HttpResponse newResponse = new Netty4HttpResponse(sequence, resp);

channel.writeAndFlush(newResponse, promise);
releaseContent = false;
releaseBytesStreamOutput = false;
} finally {
Expand All @@ -156,9 +146,6 @@ public void sendResponse(RestResponse response) {
if (releaseBytesStreamOutput) {
bytesOutputOrNull().close();
}
if (pipelinedRequest != null) {
pipelinedRequest.release();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.http.netty4;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.LastHttpContent;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.http.HttpPipeliningAggregator;
import org.elasticsearch.transport.netty4.Netty4Utils;

import java.nio.channels.ClosedChannelException;
import java.util.Collections;
import java.util.List;

/**
* Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their corresponding requests.
*/
public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {

private final Logger logger;
private final HttpPipeliningAggregator<Netty4HttpResponse, ChannelPromise> aggregator;

/**
* Construct a new pipelining handler; this handler should be used downstream of HTTP decoding/aggregation.
*
* @param logger for logging unexpected errors
* @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel connection; this is
* required as events cannot queue up indefinitely
*/
public Netty4HttpPipeliningHandler(Logger logger, final int maxEventsHeld) {
this.logger = logger;
this.aggregator = new HttpPipeliningAggregator<>(maxEventsHeld);
}

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
if (msg instanceof LastHttpContent) {
HttpPipelinedRequest<LastHttpContent> pipelinedRequest = aggregator.read(((LastHttpContent) msg));
ctx.fireChannelRead(pipelinedRequest);
} else {
ctx.fireChannelRead(msg);
}
}

@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
assert msg instanceof Netty4HttpResponse : "Message must be type: " + Netty4HttpResponse.class;
Netty4HttpResponse response = (Netty4HttpResponse) msg;
boolean success = false;
try {
List<Tuple<Netty4HttpResponse, ChannelPromise>> readyResponses = aggregator.write(response, promise);
for (Tuple<Netty4HttpResponse, ChannelPromise> readyResponse : readyResponses) {
ctx.write(readyResponse.v1().getResponse(), readyResponse.v2());
}
success = true;
} catch (IllegalStateException e) {
ctx.channel().close();
} finally {
if (success == false) {
promise.setFailure(new ClosedChannelException());
}
}
}

@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
List<Tuple<Netty4HttpResponse, ChannelPromise>> inflightResponses = aggregator.removeAllInflightResponses();

if (inflightResponses.isEmpty() == false) {
ClosedChannelException closedChannelException = new ClosedChannelException();
for (Tuple<Netty4HttpResponse, ChannelPromise> inflightResponse : inflightResponses) {
try {
inflightResponse.v2().setFailure(closedChannelException);
} catch (RuntimeException e) {
logger.error("unexpected error while releasing pipelined http responses", e);
}
}
}
ctx.close(promise);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,41 +30,30 @@
import io.netty.handler.codec.http.HttpHeaders;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.transport.netty4.Netty4Utils;

import java.util.Collections;

@ChannelHandler.Sharable
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<Object> {
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<FullHttpRequest>> {

private final Netty4HttpServerTransport serverTransport;
private final HttpHandlingSettings handlingSettings;
private final boolean httpPipeliningEnabled;
private final ThreadContext threadContext;

Netty4HttpRequestHandler(Netty4HttpServerTransport serverTransport, HttpHandlingSettings handlingSettings,
ThreadContext threadContext) {
this.serverTransport = serverTransport;
this.httpPipeliningEnabled = serverTransport.pipelining;
this.handlingSettings = handlingSettings;
this.threadContext = threadContext;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
final FullHttpRequest request;
final HttpPipelinedRequest pipelinedRequest;
if (this.httpPipeliningEnabled && msg instanceof HttpPipelinedRequest) {
pipelinedRequest = (HttpPipelinedRequest) msg;
request = (FullHttpRequest) pipelinedRequest.last();
} else {
pipelinedRequest = null;
request = (FullHttpRequest) msg;
}
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) throws Exception {
final FullHttpRequest request = msg.getRequest();

boolean success = false;
try {

final FullHttpRequest copy =
Expand Down Expand Up @@ -111,7 +100,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Except
Netty4HttpChannel innerChannel;
try {
innerChannel =
new Netty4HttpChannel(serverTransport, httpRequest, pipelinedRequest, handlingSettings, threadContext);
new Netty4HttpChannel(serverTransport, httpRequest, msg.getSequence(), handlingSettings, threadContext);
} catch (final IllegalArgumentException e) {
if (badRequestCause == null) {
badRequestCause = e;
Expand All @@ -126,7 +115,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Except
copy,
ctx.channel());
innerChannel =
new Netty4HttpChannel(serverTransport, innerRequest, pipelinedRequest, handlingSettings, threadContext);
new Netty4HttpChannel(serverTransport, innerRequest, msg.getSequence(), handlingSettings, threadContext);
}
channel = innerChannel;
}
Expand All @@ -138,12 +127,9 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Except
} else {
serverTransport.dispatchRequest(httpRequest, channel);
}
success = true;
} finally {
// the request is otherwise released in case of dispatch
if (success == false && pipelinedRequest != null) {
pipelinedRequest.release();
}
// As we have copied the buffer, we can release the request
request.release();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.http.netty4;

import io.netty.handler.codec.http.FullHttpResponse;
import org.elasticsearch.http.HttpPipelinedMessage;

public class Netty4HttpResponse extends HttpPipelinedMessage {

private final FullHttpResponse response;

public Netty4HttpResponse(int sequence, FullHttpResponse response) {
super(sequence);
this.response = response;
}

public FullHttpResponse getResponse() {
return response;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.elasticsearch.http.netty4.cors.Netty4CorsConfig;
import org.elasticsearch.http.netty4.cors.Netty4CorsConfigBuilder;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.http.netty4.pipelining.HttpPipeliningHandler;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.netty4.Netty4OpenChannelsHandler;
Expand Down Expand Up @@ -99,7 +98,6 @@
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_REUSE_ADDRESS;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_SEND_BUFFER_SIZE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS;
import static org.elasticsearch.http.netty4.cors.Netty4CorsHandler.ANY_ORIGIN;

Expand Down Expand Up @@ -162,8 +160,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {

protected final int workerCount;

protected final boolean pipelining;

protected final int pipeliningMaxEvents;

/**
Expand Down Expand Up @@ -204,14 +200,16 @@ public Netty4HttpServerTransport(Settings settings, NetworkService networkServic
this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
this.maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.get(settings);
this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);
this.httpHandlingSettings = new HttpHandlingSettings(Math.toIntExact(maxContentLength.getBytes()),
Math.toIntExact(maxChunkSize.getBytes()),
Math.toIntExact(maxHeaderSize.getBytes()),
Math.toIntExact(maxInitialLineLength.getBytes()),
SETTING_HTTP_RESET_COOKIES.get(settings),
SETTING_HTTP_COMPRESSION.get(settings),
SETTING_HTTP_COMPRESSION_LEVEL.get(settings),
SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings));
SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings),
pipeliningMaxEvents);

this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings);
this.workerCount = SETTING_HTTP_WORKER_COUNT.get(settings);
Expand All @@ -226,14 +224,12 @@ public Netty4HttpServerTransport(Settings settings, NetworkService networkServic
ByteSizeValue receivePredictor = SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE.get(settings);
recvByteBufAllocator = new FixedRecvByteBufAllocator(receivePredictor.bytesAsInt());

this.pipelining = SETTING_PIPELINING.get(settings);
this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);
this.corsConfig = buildCorsConfig(settings);

logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}], " +
"receive_predictor[{}], max_composite_buffer_components[{}], pipelining[{}], pipelining_max_events[{}]",
maxChunkSize, maxHeaderSize, maxInitialLineLength, this.maxContentLength, receivePredictor, maxCompositeBufferComponents,
pipelining, pipeliningMaxEvents);
"receive_predictor[{}], max_composite_buffer_components[{}], pipelining_max_events[{}]",
maxChunkSize, maxHeaderSize, maxInitialLineLength, maxContentLength, receivePredictor, maxCompositeBufferComponents,
pipeliningMaxEvents);
}

public Settings settings() {
Expand Down Expand Up @@ -452,9 +448,7 @@ protected void initChannel(Channel ch) throws Exception {
if (SETTING_CORS_ENABLED.get(transport.settings())) {
ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.getCorsConfig()));
}
if (transport.pipelining) {
ch.pipeline().addLast("pipelining", new HttpPipeliningHandler(transport.logger, transport.pipeliningMaxEvents));
}
ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(transport.logger, transport.pipeliningMaxEvents));
ch.pipeline().addLast("handler", requestHandler);
}

Expand Down
Loading

0 comments on commit d7040ad

Please sign in to comment.