From 65edd783cb15098087104132b0016e041637951f Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 1 May 2019 09:39:11 -0600 Subject: [PATCH] Support http read timeouts for transport-nio (#41466) This is related to #27260. Currently there is a setting http.read_timeout that allows users to define a read timeout for the http transport. This commit implements support for this functionality with the transport-nio plugin. The behavior here is that a repeating task will be scheduled for the interval defined. If there have been no requests received since the last run and there are no inflight requests, the channel will be closed. --- .../elasticsearch/nio/BytesWriteHandler.java | 3 + .../elasticsearch/nio/ReadWriteHandler.java | 5 + .../nio/SocketChannelContext.java | 1 + .../org/elasticsearch/nio/TaskScheduler.java | 2 +- .../netty4/Netty4HttpServerTransport.java | 9 +- .../Netty4HttpServerTransportTests.java | 8 +- .../http/nio/HttpReadWriteHandler.java | 47 +++++++- .../http/nio/NioHttpServerTransport.java | 2 +- .../http/nio/HttpReadWriteHandlerTests.java | 104 ++++++++++++------ .../elasticsearch/http/nio/NioHttpClient.java | 17 +++ .../http/nio/NioHttpServerTransportTests.java | 95 ++++++++-------- .../http/AbstractHttpServerTransport.java | 5 +- .../http/HttpHandlingSettings.java | 10 +- .../http/HttpReadTimeoutException.java | 31 ++++++ .../elasticsearch/threadpool/ThreadPool.java | 27 +++-- .../SecurityNetty4HttpServerTransport.java | 2 +- .../nio/SecurityNioHttpServerTransport.java | 2 +- 17 files changed, 263 insertions(+), 107 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/http/HttpReadTimeoutException.java diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/BytesWriteHandler.java b/libs/nio/src/main/java/org/elasticsearch/nio/BytesWriteHandler.java index 2d57faf5cb897..07333aa2eebc4 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/BytesWriteHandler.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/BytesWriteHandler.java @@ -34,6 +34,9 @@ public WriteOperation createWriteOperation(SocketChannelContext context, Object return new FlushReadyWrite(context, (ByteBuffer[]) message, listener); } + @Override + public void channelRegistered() {} + @Override public List writeToBytes(WriteOperation writeOperation) { assert writeOperation instanceof FlushReadyWrite : "Write operation must be flush ready"; diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/ReadWriteHandler.java b/libs/nio/src/main/java/org/elasticsearch/nio/ReadWriteHandler.java index 6b8688eccfd8c..92b276ad2d6da 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/ReadWriteHandler.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/ReadWriteHandler.java @@ -28,6 +28,11 @@ */ public interface ReadWriteHandler { + /** + * This method is called when the channel is registered with its selector. + */ + void channelRegistered(); + /** * This method is called when a message is queued with a channel. It can be called from any thread. * This method should validate that the message is a valid type and return a write operation object diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java b/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java index 1444422f7a7f6..a926bbc9710d0 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java @@ -169,6 +169,7 @@ protected FlushOperation getPendingFlush() { @Override protected void register() throws IOException { super.register(); + readWriteHandler.channelRegistered(); if (allowChannelPredicate.test(channel) == false) { closeNow = true; } diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.java b/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.java index e197230147c8b..c460e2147986a 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.java @@ -45,7 +45,7 @@ public Runnable scheduleAtRelativeTime(Runnable task, long relativeNanos) { return delayedTask; } - Runnable pollTask(long relativeNanos) { + public Runnable pollTask(long relativeNanos) { DelayedTask task; while ((task = tasks.peek()) != null) { if (relativeNanos - task.deadline >= 0) { diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index c0dc011a06c95..356cfa0bbf99d 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -45,7 +45,6 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -59,6 +58,7 @@ import org.elasticsearch.http.AbstractHttpServerTransport; import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.HttpHandlingSettings; +import org.elasticsearch.http.HttpReadTimeoutException; import org.elasticsearch.http.HttpServerChannel; import org.elasticsearch.http.netty4.cors.Netty4CorsConfig; import org.elasticsearch.http.netty4.cors.Netty4CorsConfigBuilder; @@ -289,12 +289,9 @@ protected void stopInternal() { } @Override - protected void onException(HttpChannel channel, Exception cause) { + public void onException(HttpChannel channel, Exception cause) { if (cause instanceof ReadTimeoutException) { - if (logger.isTraceEnabled()) { - logger.trace("Http read timeout {}", channel); - } - CloseableChannel.closeChannel(channel); + super.onException(channel, new HttpReadTimeoutException(readTimeoutMillis, cause)); } else { super.onException(channel, cause); } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java index 63e38823acb31..bc4ebe5672ec7 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java @@ -73,8 +73,8 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.PatternSyntaxException; import java.util.stream.Collectors; @@ -346,7 +346,7 @@ public void dispatchBadRequest(final RestRequest request, transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - AtomicBoolean channelClosed = new AtomicBoolean(false); + CountDownLatch channelClosedLatch = new CountDownLatch(1); Bootstrap clientBootstrap = new Bootstrap().channel(NioSocketChannel.class).handler(new ChannelInitializer() { @@ -357,9 +357,9 @@ protected void initChannel(SocketChannel ch) { } }).group(group); ChannelFuture connect = clientBootstrap.connect(remoteAddress.address()); - connect.channel().closeFuture().addListener(future -> channelClosed.set(true)); + connect.channel().closeFuture().addListener(future -> channelClosedLatch.countDown()); - assertBusy(() -> assertTrue("Channel should be closed due to read timeout", channelClosed.get()), 5, TimeUnit.SECONDS); + assertTrue("Channel should be closed due to read timeout", channelClosedLatch.await(1, TimeUnit.MINUTES)); } finally { group.shutdownGracefully().await(); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java index 17a5c1fb97e80..7a4fbfe42aefa 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java @@ -30,31 +30,45 @@ import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.http.HttpHandlingSettings; import org.elasticsearch.http.HttpPipelinedRequest; +import org.elasticsearch.http.HttpReadTimeoutException; import org.elasticsearch.http.nio.cors.NioCorsConfig; import org.elasticsearch.http.nio.cors.NioCorsHandler; import org.elasticsearch.nio.FlushOperation; import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.ReadWriteHandler; import org.elasticsearch.nio.SocketChannelContext; +import org.elasticsearch.nio.TaskScheduler; import org.elasticsearch.nio.WriteOperation; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; +import java.util.function.LongSupplier; public class HttpReadWriteHandler implements ReadWriteHandler { private final NettyAdaptor adaptor; private final NioHttpChannel nioHttpChannel; private final NioHttpServerTransport transport; + private final TaskScheduler taskScheduler; + private final LongSupplier nanoClock; + private final long readTimeoutNanos; + private boolean channelRegistered = false; + private boolean requestSinceReadTimeoutTrigger = false; + private int inFlightRequests = 0; public HttpReadWriteHandler(NioHttpChannel nioHttpChannel, NioHttpServerTransport transport, HttpHandlingSettings settings, - NioCorsConfig corsConfig) { + NioCorsConfig corsConfig, TaskScheduler taskScheduler, LongSupplier nanoClock) { this.nioHttpChannel = nioHttpChannel; this.transport = transport; + this.taskScheduler = taskScheduler; + this.nanoClock = nanoClock; + this.readTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(settings.getReadTimeoutMillis()); List handlers = new ArrayList<>(5); HttpRequestDecoder decoder = new HttpRequestDecoder(settings.getMaxInitialLineLength(), settings.getMaxHeaderSize(), @@ -77,10 +91,21 @@ public HttpReadWriteHandler(NioHttpChannel nioHttpChannel, NioHttpServerTranspor } @Override - public int consumeReads(InboundChannelBuffer channelBuffer) throws IOException { + public void channelRegistered() { + channelRegistered = true; + if (readTimeoutNanos > 0) { + scheduleReadTimeout(); + } + } + + @Override + public int consumeReads(InboundChannelBuffer channelBuffer) { + assert channelRegistered : "channelRegistered should have been called"; int bytesConsumed = adaptor.read(channelBuffer.sliceAndRetainPagesTo(channelBuffer.getIndex())); Object message; while ((message = adaptor.pollInboundMessage()) != null) { + ++inFlightRequests; + requestSinceReadTimeoutTrigger = true; handleRequest(message); } @@ -96,6 +121,11 @@ public WriteOperation createWriteOperation(SocketChannelContext context, Object @Override public List writeToBytes(WriteOperation writeOperation) { + assert writeOperation.getObject() instanceof NioHttpResponse : "This channel only supports messages that are of type: " + + NioHttpResponse.class + ". Found type: " + writeOperation.getObject().getClass() + "."; + assert channelRegistered : "channelRegistered should have been called"; + --inFlightRequests; + assert inFlightRequests >= 0 : "Inflight requests should never drop below zero, found: " + inFlightRequests; adaptor.write(writeOperation); return pollFlushOperations(); } @@ -152,4 +182,17 @@ private void handleRequest(Object msg) { request.release(); } } + + private void maybeReadTimeout() { + if (requestSinceReadTimeoutTrigger == false && inFlightRequests == 0) { + transport.onException(nioHttpChannel, new HttpReadTimeoutException(TimeValue.nsecToMSec(readTimeoutNanos))); + } else { + requestSinceReadTimeoutTrigger = false; + scheduleReadTimeout(); + } + } + + private void scheduleReadTimeout() { + taskScheduler.scheduleAtRelativeTime(this::maybeReadTimeout, nanoClock.getAsLong() + readTimeoutNanos); + } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java index 57936ff70c628..2730cb6d3a9b3 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java @@ -211,7 +211,7 @@ public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel) return new Page(ByteBuffer.wrap(bytes.v()), bytes::close); }; HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(httpChannel,NioHttpServerTransport.this, - handlingSettings, corsConfig); + handlingSettings, corsConfig, selector.getTaskScheduler(), threadPool::relativeTimeInMillis); Consumer exceptionHandler = (e) -> onException(httpChannel, e); SocketChannelContext context = new BytesChannelContext(httpChannel, selector, exceptionHandler, httpReadWritePipeline, new InboundChannelBuffer(pageSupplier)); diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java index d7e61f21173bb..3d136705ef9d9 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java @@ -33,12 +33,13 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.HttpVersion; - import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.HttpHandlingSettings; +import org.elasticsearch.http.HttpReadTimeoutException; import org.elasticsearch.http.HttpRequest; import org.elasticsearch.http.HttpResponse; import org.elasticsearch.http.HttpTransportSettings; @@ -48,6 +49,7 @@ import org.elasticsearch.nio.FlushOperation; import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.SocketChannelContext; +import org.elasticsearch.nio.TaskScheduler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; @@ -56,6 +58,8 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.function.BiConsumer; @@ -63,19 +67,14 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_METHODS; import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_ORIGIN; import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ENABLED; -import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_COMPRESSION; -import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_COMPRESSION_LEVEL; -import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_DETAILED_ERRORS_ENABLED; -import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE; -import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; -import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH; -import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_RESET_COOKIES; -import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS; +import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH; +import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -84,8 +83,9 @@ public class HttpReadWriteHandlerTests extends ESTestCase { private HttpReadWriteHandler handler; - private NioHttpChannel nioHttpChannel; + private NioHttpChannel channel; private NioHttpServerTransport transport; + private TaskScheduler taskScheduler; private final RequestEncoder requestEncoder = new RequestEncoder(); private final ResponseDecoder responseDecoder = new ResponseDecoder(); @@ -93,22 +93,14 @@ public class HttpReadWriteHandlerTests extends ESTestCase { @Before public void setMocks() { transport = mock(NioHttpServerTransport.class); - Settings settings = Settings.EMPTY; - ByteSizeValue maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.getDefault(settings); - ByteSizeValue maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.getDefault(settings); - ByteSizeValue maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.getDefault(settings); - HttpHandlingSettings httpHandlingSettings = new HttpHandlingSettings(1024, - Math.toIntExact(maxChunkSize.getBytes()), - Math.toIntExact(maxHeaderSize.getBytes()), - Math.toIntExact(maxInitialLineLength.getBytes()), - SETTING_HTTP_RESET_COOKIES.getDefault(settings), - SETTING_HTTP_COMPRESSION.getDefault(settings), - SETTING_HTTP_COMPRESSION_LEVEL.getDefault(settings), - SETTING_HTTP_DETAILED_ERRORS_ENABLED.getDefault(settings), - SETTING_PIPELINING_MAX_EVENTS.getDefault(settings), - SETTING_CORS_ENABLED.getDefault(settings)); - nioHttpChannel = mock(NioHttpChannel.class); - handler = new HttpReadWriteHandler(nioHttpChannel, transport, httpHandlingSettings, NioCorsConfigBuilder.forAnyOrigin().build()); + Settings settings = Settings.builder().put(SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(), new ByteSizeValue(1024)).build(); + HttpHandlingSettings httpHandlingSettings = HttpHandlingSettings.fromSettings(settings); + channel = mock(NioHttpChannel.class); + taskScheduler = mock(TaskScheduler.class); + + NioCorsConfig corsConfig = NioCorsConfigBuilder.forAnyOrigin().build(); + handler = new HttpReadWriteHandler(channel, transport, httpHandlingSettings, corsConfig, taskScheduler, System::nanoTime); + handler.channelRegistered(); } public void testSuccessfulDecodeHttpRequest() throws IOException { @@ -188,7 +180,7 @@ public void testDecodeHttpRequestContentLengthToLongGeneratesOutboundMessage() t flushOperation.getListener().accept(null, null); // Since we have keep-alive set to false, we should close the channel after the response has been // flushed - verify(nioHttpChannel).close(); + verify(channel).close(); } finally { response.release(); } @@ -335,10 +327,59 @@ public void testThatAnyOriginWorks() throws IOException { } } - private FullHttpResponse executeCorsRequest(final Settings settings, final String originValue, final String host) throws IOException { + @SuppressWarnings("unchecked") + public void testReadTimeout() throws IOException { + TimeValue timeValue = TimeValue.timeValueMillis(500); + Settings settings = Settings.builder().put(SETTING_HTTP_READ_TIMEOUT.getKey(), timeValue).build(); HttpHandlingSettings httpHandlingSettings = HttpHandlingSettings.fromSettings(settings); - NioCorsConfig nioCorsConfig = NioHttpServerTransport.buildCorsConfig(settings); - HttpReadWriteHandler handler = new HttpReadWriteHandler(nioHttpChannel, transport, httpHandlingSettings, nioCorsConfig); + DefaultFullHttpRequest nettyRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); + NioHttpRequest nioHttpRequest = new NioHttpRequest(nettyRequest, 0); + NioHttpResponse httpResponse = nioHttpRequest.createResponse(RestStatus.OK, BytesArray.EMPTY); + httpResponse.addHeader(HttpHeaderNames.CONTENT_LENGTH.toString(), "0"); + + NioCorsConfig corsConfig = NioCorsConfigBuilder.forAnyOrigin().build(); + TaskScheduler taskScheduler = new TaskScheduler(); + + Iterator timeValues = Arrays.asList(0, 2, 4, 6, 8).iterator(); + handler = new HttpReadWriteHandler(channel, transport, httpHandlingSettings, corsConfig, taskScheduler, timeValues::next); + handler.channelRegistered(); + + prepareHandlerForResponse(handler); + SocketChannelContext context = mock(SocketChannelContext.class); + HttpWriteOperation writeOperation = new HttpWriteOperation(context, httpResponse, mock(BiConsumer.class)); + handler.writeToBytes(writeOperation); + + taskScheduler.pollTask(timeValue.getNanos() + 1).run(); + // There was a read. Do not close. + verify(transport, times(0)).onException(eq(channel), any(HttpReadTimeoutException.class)); + + prepareHandlerForResponse(handler); + prepareHandlerForResponse(handler); + + taskScheduler.pollTask(timeValue.getNanos() + 3).run(); + // There was a read. Do not close. + verify(transport, times(0)).onException(eq(channel), any(HttpReadTimeoutException.class)); + + handler.writeToBytes(writeOperation); + + taskScheduler.pollTask(timeValue.getNanos() + 5).run(); + // There has not been a read, however there is still an inflight request. Do not close. + verify(transport, times(0)).onException(eq(channel), any(HttpReadTimeoutException.class)); + + handler.writeToBytes(writeOperation); + + taskScheduler.pollTask(timeValue.getNanos() + 7).run(); + // No reads and no inflight requests, close + verify(transport, times(1)).onException(eq(channel), any(HttpReadTimeoutException.class)); + assertNull(taskScheduler.pollTask(timeValue.getNanos() + 9)); + } + + private FullHttpResponse executeCorsRequest(final Settings settings, final String originValue, final String host) throws IOException { + HttpHandlingSettings httpSettings = HttpHandlingSettings.fromSettings(settings); + NioCorsConfig corsConfig = NioHttpServerTransport.buildCorsConfig(settings); + HttpReadWriteHandler handler = new HttpReadWriteHandler(channel, transport, httpSettings, corsConfig, taskScheduler, + System::nanoTime); + handler.channelRegistered(); prepareHandlerForResponse(handler); DefaultFullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); if (originValue != null) { @@ -360,7 +401,7 @@ private FullHttpResponse executeCorsRequest(final Settings settings, final Strin - private NioHttpRequest prepareHandlerForResponse(HttpReadWriteHandler handler) throws IOException { + private void prepareHandlerForResponse(HttpReadWriteHandler handler) throws IOException { HttpMethod method = randomBoolean() ? HttpMethod.GET : HttpMethod.HEAD; HttpVersion version = randomBoolean() ? HttpVersion.HTTP_1_0 : HttpVersion.HTTP_1_1; String uri = "http://localhost:9090/" + randomAlphaOfLength(8); @@ -385,7 +426,6 @@ private NioHttpRequest prepareHandlerForResponse(HttpReadWriteHandler handler) t assertEquals(HttpRequest.HttpVersion.HTTP_1_0, nioHttpRequest.protocolVersion()); } assertEquals(nioHttpRequest.uri(), uri); - return nioHttpRequest; } private InboundChannelBuffer toChannelBuffer(ByteBuf buf) { diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpClient.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpClient.java index e3259b10b9745..634ea7b44af74 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpClient.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpClient.java @@ -116,6 +116,20 @@ public final FullHttpResponse post(InetSocketAddress remoteAddress, FullHttpRequ return responses.iterator().next(); } + public final NioSocketChannel connect(InetSocketAddress remoteAddress) { + ChannelFactory factory = new ClientChannelFactory(new CountDownLatch(0), new + ArrayList<>()); + try { + NioSocketChannel nioSocketChannel = nioGroup.openChannel(remoteAddress, factory); + PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + nioSocketChannel.addConnectListener(ActionListener.toBiConsumer(connectFuture)); + connectFuture.actionGet(); + return nioSocketChannel; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private void onException(Exception e) { logger.error("Exception from http client", e); } @@ -212,6 +226,9 @@ private HttpClientHandler(NioSocketChannel channel, CountDownLatch latch, Collec adaptor.addCloseListener((v, e) -> channel.close()); } + @Override + public void channelRegistered() {} + @Override public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer listener) { assert message instanceof HttpRequest : "Expected type HttpRequest.class, found: " + message.getClass(); diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java index 2ffd5a64147cc..0b470fda00a6b 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -49,6 +50,7 @@ import org.elasticsearch.http.NullDispatcher; import org.elasticsearch.http.nio.cors.NioCorsConfig; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; @@ -66,6 +68,8 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.PatternSyntaxException; import java.util.stream.Collectors; @@ -309,52 +313,47 @@ threadPool, xContentRegistry(), dispatcher, new NioGroupFactory(settings, logger assertThat(causeReference.get(), instanceOf(TooLongFrameException.class)); } -// public void testReadTimeout() throws Exception { -// final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { -// -// @Override -// public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { -// throw new AssertionError("Should not have received a dispatched request"); -// } -// -// @Override -// public void dispatchBadRequest(final RestRequest request, -// final RestChannel channel, -// final ThreadContext threadContext, -// final Throwable cause) { -// throw new AssertionError("Should not have received a dispatched request"); -// } -// -// }; -// -// Settings settings = Settings.builder() -// .put(HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT.getKey(), new TimeValue(randomIntBetween(100, 300))) -// .build(); -// -// -// NioEventLoopGroup group = new NioEventLoopGroup(); -// try (NioHttpServerTransport transport = -// new NioHttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher)) { -// transport.start(); -// final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); -// -// AtomicBoolean channelClosed = new AtomicBoolean(false); -// -// Bootstrap clientBootstrap = new Bootstrap().channel(NioSocketChannel.class).handler(new ChannelInitializer() { -// -// @Override -// protected void initChannel(SocketChannel ch) { -// ch.pipeline().addLast(new ChannelHandlerAdapter() {}); -// -// } -// }).group(group); -// ChannelFuture connect = clientBootstrap.connect(remoteAddress.address()); -// connect.channel().closeFuture().addListener(future -> channelClosed.set(true)); -// -// assertBusy(() -> assertTrue("Channel should be closed due to read timeout", channelClosed.get()), 5, TimeUnit.SECONDS); -// -// } finally { -// group.shutdownGracefully().await(); -// } -// } + public void testReadTimeout() throws Exception { + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + + @Override + public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + throw new AssertionError("Should not have received a dispatched request"); + } + + @Override + public void dispatchBadRequest(final RestRequest request, + final RestChannel channel, + final ThreadContext threadContext, + final Throwable cause) { + throw new AssertionError("Should not have received a dispatched request"); + } + + }; + + Settings settings = Settings.builder() + .put(HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT.getKey(), new TimeValue(randomIntBetween(100, 300))) + .build(); + + + try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler, + threadPool, xContentRegistry(), dispatcher, new NioGroupFactory(settings, logger))) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + + try (NioHttpClient client = new NioHttpClient()) { + NioSocketChannel channel = null; + try { + CountDownLatch channelClosedLatch = new CountDownLatch(1); + channel = client.connect(remoteAddress.address()); + channel.addCloseListener((r, t) -> channelClosedLatch.countDown()); + assertTrue("Channel should be closed due to read timeout", channelClosedLatch.await(1, TimeUnit.MINUTES)); + } finally { + if (channel != null) { + channel.close(); + } + } + } + } + } } diff --git a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java index b79a5e77309b9..ab6168cad522e 100644 --- a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java @@ -249,7 +249,7 @@ static int resolvePublishPort(Settings settings, List boundAdd return publishPort; } - protected void onException(HttpChannel channel, Exception e) { + public void onException(HttpChannel channel, Exception e) { if (lifecycle.started() == false) { // just close and ignore - we are already stopped and just need to make sure we release all resources CloseableChannel.closeChannel(channel); @@ -263,6 +263,9 @@ protected void onException(HttpChannel channel, Exception e) { logger.trace(() -> new ParameterizedMessage( "connect exception caught while handling client http traffic, closing connection {}", channel), e); CloseableChannel.closeChannel(channel); + } else if (e instanceof HttpReadTimeoutException) { + logger.trace(() -> new ParameterizedMessage("http read timeout, closing connection {}", channel), e); + CloseableChannel.closeChannel(channel); } else if (e instanceof CancelledKeyException) { logger.trace(() -> new ParameterizedMessage( "cancelled key exception caught while handling client http traffic, closing connection {}", channel), e); diff --git a/server/src/main/java/org/elasticsearch/http/HttpHandlingSettings.java b/server/src/main/java/org/elasticsearch/http/HttpHandlingSettings.java index 568f2912a677c..805ebc3d95dd3 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpHandlingSettings.java +++ b/server/src/main/java/org/elasticsearch/http/HttpHandlingSettings.java @@ -29,6 +29,7 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH; +import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_RESET_COOKIES; import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS; @@ -43,11 +44,12 @@ public class HttpHandlingSettings { private final int compressionLevel; private final boolean detailedErrorsEnabled; private final int pipeliningMaxEvents; + private final long readTimeoutMillis; private boolean corsEnabled; public HttpHandlingSettings(int maxContentLength, int maxChunkSize, int maxHeaderSize, int maxInitialLineLength, boolean resetCookies, boolean compression, int compressionLevel, boolean detailedErrorsEnabled, - int pipeliningMaxEvents, boolean corsEnabled) { + int pipeliningMaxEvents, long readTimeoutMillis, boolean corsEnabled) { this.maxContentLength = maxContentLength; this.maxChunkSize = maxChunkSize; this.maxHeaderSize = maxHeaderSize; @@ -57,6 +59,7 @@ public HttpHandlingSettings(int maxContentLength, int maxChunkSize, int maxHeade this.compressionLevel = compressionLevel; this.detailedErrorsEnabled = detailedErrorsEnabled; this.pipeliningMaxEvents = pipeliningMaxEvents; + this.readTimeoutMillis = readTimeoutMillis; this.corsEnabled = corsEnabled; } @@ -70,6 +73,7 @@ public static HttpHandlingSettings fromSettings(Settings settings) { SETTING_HTTP_COMPRESSION_LEVEL.get(settings), SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings), SETTING_PIPELINING_MAX_EVENTS.get(settings), + SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis(), SETTING_CORS_ENABLED.get(settings)); } @@ -109,6 +113,10 @@ public int getPipeliningMaxEvents() { return pipeliningMaxEvents; } + public long getReadTimeoutMillis() { + return readTimeoutMillis; + } + public boolean isCorsEnabled() { return corsEnabled; } diff --git a/server/src/main/java/org/elasticsearch/http/HttpReadTimeoutException.java b/server/src/main/java/org/elasticsearch/http/HttpReadTimeoutException.java new file mode 100644 index 0000000000000..a4f54e92f34a6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/http/HttpReadTimeoutException.java @@ -0,0 +1,31 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.http; + +public class HttpReadTimeoutException extends RuntimeException { + + public HttpReadTimeoutException(long readTimeoutMillis) { + super("http read timeout after " + readTimeoutMillis + "ms"); + + } + + public HttpReadTimeoutException(long readTimeoutMillis, Exception cause) { + super("http read timeout after " + readTimeoutMillis + "ms", cause); + } +} diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index df97d2e548e57..df3e1da5dce6c 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -237,7 +237,17 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui * timestamp, see {@link #absoluteTimeInMillis()}. */ public long relativeTimeInMillis() { - return cachedTimeThread.relativeTimeInMillis(); + return TimeValue.nsecToMSec(relativeTimeInNanos()); + } + + /** + * Returns a value of nanoseconds that may be used for relative time calculations. + * + * This method should only be used for calculating time deltas. For an epoch based + * timestamp, see {@link #absoluteTimeInMillis()}. + */ + public long relativeTimeInNanos() { + return cachedTimeThread.relativeTimeInNanos(); } /** @@ -534,30 +544,29 @@ static class CachedTimeThread extends Thread { final long interval; volatile boolean running = true; - volatile long relativeMillis; + volatile long relativeNanos; volatile long absoluteMillis; CachedTimeThread(String name, long interval) { super(name); this.interval = interval; - this.relativeMillis = TimeValue.nsecToMSec(System.nanoTime()); + this.relativeNanos = System.nanoTime(); this.absoluteMillis = System.currentTimeMillis(); setDaemon(true); } /** - * Return the current time used for relative calculations. This is - * {@link System#nanoTime()} truncated to milliseconds. + * Return the current time used for relative calculations. This is {@link System#nanoTime()}. *

* If {@link ThreadPool#ESTIMATED_TIME_INTERVAL_SETTING} is set to 0 * then the cache is disabled and the method calls {@link System#nanoTime()} * whenever called. Typically used for testing. */ - long relativeTimeInMillis() { + long relativeTimeInNanos() { if (0 < interval) { - return relativeMillis; + return relativeNanos; } - return TimeValue.nsecToMSec(System.nanoTime()); + return System.nanoTime(); } /** @@ -578,7 +587,7 @@ long absoluteTimeInMillis() { @Override public void run() { while (running && 0 < interval) { - relativeMillis = TimeValue.nsecToMSec(System.nanoTime()); + relativeNanos = System.nanoTime(); absoluteMillis = System.currentTimeMillis(); try { Thread.sleep(interval); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java index 596f9c1e6e107..043af216b8f35 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java @@ -54,7 +54,7 @@ public SecurityNetty4HttpServerTransport(Settings settings, NetworkService netwo } @Override - protected void onException(HttpChannel channel, Exception e) { + public void onException(HttpChannel channel, Exception e) { securityExceptionHandler.accept(channel, e); } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransport.java index 8ecba16fa460d..b65f29eb95100 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransport.java @@ -98,7 +98,7 @@ public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel) return new Page(ByteBuffer.wrap(bytes.v()), bytes::close); }; HttpReadWriteHandler httpHandler = new HttpReadWriteHandler(httpChannel,SecurityNioHttpServerTransport.this, - handlingSettings, corsConfig); + handlingSettings, corsConfig, selector.getTaskScheduler(), threadPool::relativeTimeInNanos); InboundChannelBuffer buffer = new InboundChannelBuffer(pageSupplier); Consumer exceptionHandler = (e) -> securityExceptionHandler.accept(httpChannel, e);