diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/BytesWriteHandler.java b/libs/nio/src/main/java/org/elasticsearch/nio/BytesWriteHandler.java index 2d57faf5cb897..07333aa2eebc4 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/BytesWriteHandler.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/BytesWriteHandler.java @@ -34,6 +34,9 @@ public WriteOperation createWriteOperation(SocketChannelContext context, Object return new FlushReadyWrite(context, (ByteBuffer[]) message, listener); } + @Override + public void channelRegistered() {} + @Override public List writeToBytes(WriteOperation writeOperation) { assert writeOperation instanceof FlushReadyWrite : "Write operation must be flush ready"; diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/ReadWriteHandler.java b/libs/nio/src/main/java/org/elasticsearch/nio/ReadWriteHandler.java index 6b8688eccfd8c..92b276ad2d6da 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/ReadWriteHandler.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/ReadWriteHandler.java @@ -28,6 +28,11 @@ */ public interface ReadWriteHandler { + /** + * This method is called when the channel is registered with its selector. + */ + void channelRegistered(); + /** * This method is called when a message is queued with a channel. It can be called from any thread. * This method should validate that the message is a valid type and return a write operation object diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java b/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java index 1444422f7a7f6..a926bbc9710d0 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java @@ -169,6 +169,7 @@ protected FlushOperation getPendingFlush() { @Override protected void register() throws IOException { super.register(); + readWriteHandler.channelRegistered(); if (allowChannelPredicate.test(channel) == false) { closeNow = true; } diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.java b/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.java index e197230147c8b..c460e2147986a 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.java @@ -45,7 +45,7 @@ public Runnable scheduleAtRelativeTime(Runnable task, long relativeNanos) { return delayedTask; } - Runnable pollTask(long relativeNanos) { + public Runnable pollTask(long relativeNanos) { DelayedTask task; while ((task = tasks.peek()) != null) { if (relativeNanos - task.deadline >= 0) { diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index c0dc011a06c95..356cfa0bbf99d 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -45,7 +45,6 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -59,6 +58,7 @@ import org.elasticsearch.http.AbstractHttpServerTransport; import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.HttpHandlingSettings; +import org.elasticsearch.http.HttpReadTimeoutException; import org.elasticsearch.http.HttpServerChannel; import org.elasticsearch.http.netty4.cors.Netty4CorsConfig; import org.elasticsearch.http.netty4.cors.Netty4CorsConfigBuilder; @@ -289,12 +289,9 @@ protected void stopInternal() { } @Override - protected void onException(HttpChannel channel, Exception cause) { + public void onException(HttpChannel channel, Exception cause) { if (cause instanceof ReadTimeoutException) { - if (logger.isTraceEnabled()) { - logger.trace("Http read timeout {}", channel); - } - CloseableChannel.closeChannel(channel); + super.onException(channel, new HttpReadTimeoutException(readTimeoutMillis, cause)); } else { super.onException(channel, cause); } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java index 63e38823acb31..bc4ebe5672ec7 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java @@ -73,8 +73,8 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.PatternSyntaxException; import java.util.stream.Collectors; @@ -346,7 +346,7 @@ public void dispatchBadRequest(final RestRequest request, transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - AtomicBoolean channelClosed = new AtomicBoolean(false); + CountDownLatch channelClosedLatch = new CountDownLatch(1); Bootstrap clientBootstrap = new Bootstrap().channel(NioSocketChannel.class).handler(new ChannelInitializer() { @@ -357,9 +357,9 @@ protected void initChannel(SocketChannel ch) { } }).group(group); ChannelFuture connect = clientBootstrap.connect(remoteAddress.address()); - connect.channel().closeFuture().addListener(future -> channelClosed.set(true)); + connect.channel().closeFuture().addListener(future -> channelClosedLatch.countDown()); - assertBusy(() -> assertTrue("Channel should be closed due to read timeout", channelClosed.get()), 5, TimeUnit.SECONDS); + assertTrue("Channel should be closed due to read timeout", channelClosedLatch.await(1, TimeUnit.MINUTES)); } finally { group.shutdownGracefully().await(); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java index 17a5c1fb97e80..7a4fbfe42aefa 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java @@ -30,31 +30,45 @@ import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.http.HttpHandlingSettings; import org.elasticsearch.http.HttpPipelinedRequest; +import org.elasticsearch.http.HttpReadTimeoutException; import org.elasticsearch.http.nio.cors.NioCorsConfig; import org.elasticsearch.http.nio.cors.NioCorsHandler; import org.elasticsearch.nio.FlushOperation; import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.ReadWriteHandler; import org.elasticsearch.nio.SocketChannelContext; +import org.elasticsearch.nio.TaskScheduler; import org.elasticsearch.nio.WriteOperation; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; +import java.util.function.LongSupplier; public class HttpReadWriteHandler implements ReadWriteHandler { private final NettyAdaptor adaptor; private final NioHttpChannel nioHttpChannel; private final NioHttpServerTransport transport; + private final TaskScheduler taskScheduler; + private final LongSupplier nanoClock; + private final long readTimeoutNanos; + private boolean channelRegistered = false; + private boolean requestSinceReadTimeoutTrigger = false; + private int inFlightRequests = 0; public HttpReadWriteHandler(NioHttpChannel nioHttpChannel, NioHttpServerTransport transport, HttpHandlingSettings settings, - NioCorsConfig corsConfig) { + NioCorsConfig corsConfig, TaskScheduler taskScheduler, LongSupplier nanoClock) { this.nioHttpChannel = nioHttpChannel; this.transport = transport; + this.taskScheduler = taskScheduler; + this.nanoClock = nanoClock; + this.readTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(settings.getReadTimeoutMillis()); List handlers = new ArrayList<>(5); HttpRequestDecoder decoder = new HttpRequestDecoder(settings.getMaxInitialLineLength(), settings.getMaxHeaderSize(), @@ -77,10 +91,21 @@ public HttpReadWriteHandler(NioHttpChannel nioHttpChannel, NioHttpServerTranspor } @Override - public int consumeReads(InboundChannelBuffer channelBuffer) throws IOException { + public void channelRegistered() { + channelRegistered = true; + if (readTimeoutNanos > 0) { + scheduleReadTimeout(); + } + } + + @Override + public int consumeReads(InboundChannelBuffer channelBuffer) { + assert channelRegistered : "channelRegistered should have been called"; int bytesConsumed = adaptor.read(channelBuffer.sliceAndRetainPagesTo(channelBuffer.getIndex())); Object message; while ((message = adaptor.pollInboundMessage()) != null) { + ++inFlightRequests; + requestSinceReadTimeoutTrigger = true; handleRequest(message); } @@ -96,6 +121,11 @@ public WriteOperation createWriteOperation(SocketChannelContext context, Object @Override public List writeToBytes(WriteOperation writeOperation) { + assert writeOperation.getObject() instanceof NioHttpResponse : "This channel only supports messages that are of type: " + + NioHttpResponse.class + ". Found type: " + writeOperation.getObject().getClass() + "."; + assert channelRegistered : "channelRegistered should have been called"; + --inFlightRequests; + assert inFlightRequests >= 0 : "Inflight requests should never drop below zero, found: " + inFlightRequests; adaptor.write(writeOperation); return pollFlushOperations(); } @@ -152,4 +182,17 @@ private void handleRequest(Object msg) { request.release(); } } + + private void maybeReadTimeout() { + if (requestSinceReadTimeoutTrigger == false && inFlightRequests == 0) { + transport.onException(nioHttpChannel, new HttpReadTimeoutException(TimeValue.nsecToMSec(readTimeoutNanos))); + } else { + requestSinceReadTimeoutTrigger = false; + scheduleReadTimeout(); + } + } + + private void scheduleReadTimeout() { + taskScheduler.scheduleAtRelativeTime(this::maybeReadTimeout, nanoClock.getAsLong() + readTimeoutNanos); + } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java index 57936ff70c628..2730cb6d3a9b3 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java @@ -211,7 +211,7 @@ public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel) return new Page(ByteBuffer.wrap(bytes.v()), bytes::close); }; HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(httpChannel,NioHttpServerTransport.this, - handlingSettings, corsConfig); + handlingSettings, corsConfig, selector.getTaskScheduler(), threadPool::relativeTimeInMillis); Consumer exceptionHandler = (e) -> onException(httpChannel, e); SocketChannelContext context = new BytesChannelContext(httpChannel, selector, exceptionHandler, httpReadWritePipeline, new InboundChannelBuffer(pageSupplier)); diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java index d7e61f21173bb..3d136705ef9d9 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java @@ -33,12 +33,13 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.HttpVersion; - import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.HttpHandlingSettings; +import org.elasticsearch.http.HttpReadTimeoutException; import org.elasticsearch.http.HttpRequest; import org.elasticsearch.http.HttpResponse; import org.elasticsearch.http.HttpTransportSettings; @@ -48,6 +49,7 @@ import org.elasticsearch.nio.FlushOperation; import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.SocketChannelContext; +import org.elasticsearch.nio.TaskScheduler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; @@ -56,6 +58,8 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.function.BiConsumer; @@ -63,19 +67,14 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_METHODS; import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_ORIGIN; import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ENABLED; -import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_COMPRESSION; -import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_COMPRESSION_LEVEL; -import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_DETAILED_ERRORS_ENABLED; -import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE; -import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; -import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH; -import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_RESET_COOKIES; -import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS; +import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH; +import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -84,8 +83,9 @@ public class HttpReadWriteHandlerTests extends ESTestCase { private HttpReadWriteHandler handler; - private NioHttpChannel nioHttpChannel; + private NioHttpChannel channel; private NioHttpServerTransport transport; + private TaskScheduler taskScheduler; private final RequestEncoder requestEncoder = new RequestEncoder(); private final ResponseDecoder responseDecoder = new ResponseDecoder(); @@ -93,22 +93,14 @@ public class HttpReadWriteHandlerTests extends ESTestCase { @Before public void setMocks() { transport = mock(NioHttpServerTransport.class); - Settings settings = Settings.EMPTY; - ByteSizeValue maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.getDefault(settings); - ByteSizeValue maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.getDefault(settings); - ByteSizeValue maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.getDefault(settings); - HttpHandlingSettings httpHandlingSettings = new HttpHandlingSettings(1024, - Math.toIntExact(maxChunkSize.getBytes()), - Math.toIntExact(maxHeaderSize.getBytes()), - Math.toIntExact(maxInitialLineLength.getBytes()), - SETTING_HTTP_RESET_COOKIES.getDefault(settings), - SETTING_HTTP_COMPRESSION.getDefault(settings), - SETTING_HTTP_COMPRESSION_LEVEL.getDefault(settings), - SETTING_HTTP_DETAILED_ERRORS_ENABLED.getDefault(settings), - SETTING_PIPELINING_MAX_EVENTS.getDefault(settings), - SETTING_CORS_ENABLED.getDefault(settings)); - nioHttpChannel = mock(NioHttpChannel.class); - handler = new HttpReadWriteHandler(nioHttpChannel, transport, httpHandlingSettings, NioCorsConfigBuilder.forAnyOrigin().build()); + Settings settings = Settings.builder().put(SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(), new ByteSizeValue(1024)).build(); + HttpHandlingSettings httpHandlingSettings = HttpHandlingSettings.fromSettings(settings); + channel = mock(NioHttpChannel.class); + taskScheduler = mock(TaskScheduler.class); + + NioCorsConfig corsConfig = NioCorsConfigBuilder.forAnyOrigin().build(); + handler = new HttpReadWriteHandler(channel, transport, httpHandlingSettings, corsConfig, taskScheduler, System::nanoTime); + handler.channelRegistered(); } public void testSuccessfulDecodeHttpRequest() throws IOException { @@ -188,7 +180,7 @@ public void testDecodeHttpRequestContentLengthToLongGeneratesOutboundMessage() t flushOperation.getListener().accept(null, null); // Since we have keep-alive set to false, we should close the channel after the response has been // flushed - verify(nioHttpChannel).close(); + verify(channel).close(); } finally { response.release(); } @@ -335,10 +327,59 @@ public void testThatAnyOriginWorks() throws IOException { } } - private FullHttpResponse executeCorsRequest(final Settings settings, final String originValue, final String host) throws IOException { + @SuppressWarnings("unchecked") + public void testReadTimeout() throws IOException { + TimeValue timeValue = TimeValue.timeValueMillis(500); + Settings settings = Settings.builder().put(SETTING_HTTP_READ_TIMEOUT.getKey(), timeValue).build(); HttpHandlingSettings httpHandlingSettings = HttpHandlingSettings.fromSettings(settings); - NioCorsConfig nioCorsConfig = NioHttpServerTransport.buildCorsConfig(settings); - HttpReadWriteHandler handler = new HttpReadWriteHandler(nioHttpChannel, transport, httpHandlingSettings, nioCorsConfig); + DefaultFullHttpRequest nettyRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); + NioHttpRequest nioHttpRequest = new NioHttpRequest(nettyRequest, 0); + NioHttpResponse httpResponse = nioHttpRequest.createResponse(RestStatus.OK, BytesArray.EMPTY); + httpResponse.addHeader(HttpHeaderNames.CONTENT_LENGTH.toString(), "0"); + + NioCorsConfig corsConfig = NioCorsConfigBuilder.forAnyOrigin().build(); + TaskScheduler taskScheduler = new TaskScheduler(); + + Iterator timeValues = Arrays.asList(0, 2, 4, 6, 8).iterator(); + handler = new HttpReadWriteHandler(channel, transport, httpHandlingSettings, corsConfig, taskScheduler, timeValues::next); + handler.channelRegistered(); + + prepareHandlerForResponse(handler); + SocketChannelContext context = mock(SocketChannelContext.class); + HttpWriteOperation writeOperation = new HttpWriteOperation(context, httpResponse, mock(BiConsumer.class)); + handler.writeToBytes(writeOperation); + + taskScheduler.pollTask(timeValue.getNanos() + 1).run(); + // There was a read. Do not close. + verify(transport, times(0)).onException(eq(channel), any(HttpReadTimeoutException.class)); + + prepareHandlerForResponse(handler); + prepareHandlerForResponse(handler); + + taskScheduler.pollTask(timeValue.getNanos() + 3).run(); + // There was a read. Do not close. + verify(transport, times(0)).onException(eq(channel), any(HttpReadTimeoutException.class)); + + handler.writeToBytes(writeOperation); + + taskScheduler.pollTask(timeValue.getNanos() + 5).run(); + // There has not been a read, however there is still an inflight request. Do not close. + verify(transport, times(0)).onException(eq(channel), any(HttpReadTimeoutException.class)); + + handler.writeToBytes(writeOperation); + + taskScheduler.pollTask(timeValue.getNanos() + 7).run(); + // No reads and no inflight requests, close + verify(transport, times(1)).onException(eq(channel), any(HttpReadTimeoutException.class)); + assertNull(taskScheduler.pollTask(timeValue.getNanos() + 9)); + } + + private FullHttpResponse executeCorsRequest(final Settings settings, final String originValue, final String host) throws IOException { + HttpHandlingSettings httpSettings = HttpHandlingSettings.fromSettings(settings); + NioCorsConfig corsConfig = NioHttpServerTransport.buildCorsConfig(settings); + HttpReadWriteHandler handler = new HttpReadWriteHandler(channel, transport, httpSettings, corsConfig, taskScheduler, + System::nanoTime); + handler.channelRegistered(); prepareHandlerForResponse(handler); DefaultFullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); if (originValue != null) { @@ -360,7 +401,7 @@ private FullHttpResponse executeCorsRequest(final Settings settings, final Strin - private NioHttpRequest prepareHandlerForResponse(HttpReadWriteHandler handler) throws IOException { + private void prepareHandlerForResponse(HttpReadWriteHandler handler) throws IOException { HttpMethod method = randomBoolean() ? HttpMethod.GET : HttpMethod.HEAD; HttpVersion version = randomBoolean() ? HttpVersion.HTTP_1_0 : HttpVersion.HTTP_1_1; String uri = "http://localhost:9090/" + randomAlphaOfLength(8); @@ -385,7 +426,6 @@ private NioHttpRequest prepareHandlerForResponse(HttpReadWriteHandler handler) t assertEquals(HttpRequest.HttpVersion.HTTP_1_0, nioHttpRequest.protocolVersion()); } assertEquals(nioHttpRequest.uri(), uri); - return nioHttpRequest; } private InboundChannelBuffer toChannelBuffer(ByteBuf buf) { diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpClient.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpClient.java index e3259b10b9745..634ea7b44af74 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpClient.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpClient.java @@ -116,6 +116,20 @@ public final FullHttpResponse post(InetSocketAddress remoteAddress, FullHttpRequ return responses.iterator().next(); } + public final NioSocketChannel connect(InetSocketAddress remoteAddress) { + ChannelFactory factory = new ClientChannelFactory(new CountDownLatch(0), new + ArrayList<>()); + try { + NioSocketChannel nioSocketChannel = nioGroup.openChannel(remoteAddress, factory); + PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + nioSocketChannel.addConnectListener(ActionListener.toBiConsumer(connectFuture)); + connectFuture.actionGet(); + return nioSocketChannel; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private void onException(Exception e) { logger.error("Exception from http client", e); } @@ -212,6 +226,9 @@ private HttpClientHandler(NioSocketChannel channel, CountDownLatch latch, Collec adaptor.addCloseListener((v, e) -> channel.close()); } + @Override + public void channelRegistered() {} + @Override public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer listener) { assert message instanceof HttpRequest : "Expected type HttpRequest.class, found: " + message.getClass(); diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java index 2ffd5a64147cc..0b470fda00a6b 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -49,6 +50,7 @@ import org.elasticsearch.http.NullDispatcher; import org.elasticsearch.http.nio.cors.NioCorsConfig; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; @@ -66,6 +68,8 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.PatternSyntaxException; import java.util.stream.Collectors; @@ -309,52 +313,47 @@ threadPool, xContentRegistry(), dispatcher, new NioGroupFactory(settings, logger assertThat(causeReference.get(), instanceOf(TooLongFrameException.class)); } -// public void testReadTimeout() throws Exception { -// final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { -// -// @Override -// public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { -// throw new AssertionError("Should not have received a dispatched request"); -// } -// -// @Override -// public void dispatchBadRequest(final RestRequest request, -// final RestChannel channel, -// final ThreadContext threadContext, -// final Throwable cause) { -// throw new AssertionError("Should not have received a dispatched request"); -// } -// -// }; -// -// Settings settings = Settings.builder() -// .put(HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT.getKey(), new TimeValue(randomIntBetween(100, 300))) -// .build(); -// -// -// NioEventLoopGroup group = new NioEventLoopGroup(); -// try (NioHttpServerTransport transport = -// new NioHttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher)) { -// transport.start(); -// final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); -// -// AtomicBoolean channelClosed = new AtomicBoolean(false); -// -// Bootstrap clientBootstrap = new Bootstrap().channel(NioSocketChannel.class).handler(new ChannelInitializer() { -// -// @Override -// protected void initChannel(SocketChannel ch) { -// ch.pipeline().addLast(new ChannelHandlerAdapter() {}); -// -// } -// }).group(group); -// ChannelFuture connect = clientBootstrap.connect(remoteAddress.address()); -// connect.channel().closeFuture().addListener(future -> channelClosed.set(true)); -// -// assertBusy(() -> assertTrue("Channel should be closed due to read timeout", channelClosed.get()), 5, TimeUnit.SECONDS); -// -// } finally { -// group.shutdownGracefully().await(); -// } -// } + public void testReadTimeout() throws Exception { + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + + @Override + public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + throw new AssertionError("Should not have received a dispatched request"); + } + + @Override + public void dispatchBadRequest(final RestRequest request, + final RestChannel channel, + final ThreadContext threadContext, + final Throwable cause) { + throw new AssertionError("Should not have received a dispatched request"); + } + + }; + + Settings settings = Settings.builder() + .put(HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT.getKey(), new TimeValue(randomIntBetween(100, 300))) + .build(); + + + try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler, + threadPool, xContentRegistry(), dispatcher, new NioGroupFactory(settings, logger))) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + + try (NioHttpClient client = new NioHttpClient()) { + NioSocketChannel channel = null; + try { + CountDownLatch channelClosedLatch = new CountDownLatch(1); + channel = client.connect(remoteAddress.address()); + channel.addCloseListener((r, t) -> channelClosedLatch.countDown()); + assertTrue("Channel should be closed due to read timeout", channelClosedLatch.await(1, TimeUnit.MINUTES)); + } finally { + if (channel != null) { + channel.close(); + } + } + } + } + } } diff --git a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java index b79a5e77309b9..ab6168cad522e 100644 --- a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java @@ -249,7 +249,7 @@ static int resolvePublishPort(Settings settings, List boundAdd return publishPort; } - protected void onException(HttpChannel channel, Exception e) { + public void onException(HttpChannel channel, Exception e) { if (lifecycle.started() == false) { // just close and ignore - we are already stopped and just need to make sure we release all resources CloseableChannel.closeChannel(channel); @@ -263,6 +263,9 @@ protected void onException(HttpChannel channel, Exception e) { logger.trace(() -> new ParameterizedMessage( "connect exception caught while handling client http traffic, closing connection {}", channel), e); CloseableChannel.closeChannel(channel); + } else if (e instanceof HttpReadTimeoutException) { + logger.trace(() -> new ParameterizedMessage("http read timeout, closing connection {}", channel), e); + CloseableChannel.closeChannel(channel); } else if (e instanceof CancelledKeyException) { logger.trace(() -> new ParameterizedMessage( "cancelled key exception caught while handling client http traffic, closing connection {}", channel), e); diff --git a/server/src/main/java/org/elasticsearch/http/HttpHandlingSettings.java b/server/src/main/java/org/elasticsearch/http/HttpHandlingSettings.java index 568f2912a677c..805ebc3d95dd3 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpHandlingSettings.java +++ b/server/src/main/java/org/elasticsearch/http/HttpHandlingSettings.java @@ -29,6 +29,7 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH; +import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_RESET_COOKIES; import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS; @@ -43,11 +44,12 @@ public class HttpHandlingSettings { private final int compressionLevel; private final boolean detailedErrorsEnabled; private final int pipeliningMaxEvents; + private final long readTimeoutMillis; private boolean corsEnabled; public HttpHandlingSettings(int maxContentLength, int maxChunkSize, int maxHeaderSize, int maxInitialLineLength, boolean resetCookies, boolean compression, int compressionLevel, boolean detailedErrorsEnabled, - int pipeliningMaxEvents, boolean corsEnabled) { + int pipeliningMaxEvents, long readTimeoutMillis, boolean corsEnabled) { this.maxContentLength = maxContentLength; this.maxChunkSize = maxChunkSize; this.maxHeaderSize = maxHeaderSize; @@ -57,6 +59,7 @@ public HttpHandlingSettings(int maxContentLength, int maxChunkSize, int maxHeade this.compressionLevel = compressionLevel; this.detailedErrorsEnabled = detailedErrorsEnabled; this.pipeliningMaxEvents = pipeliningMaxEvents; + this.readTimeoutMillis = readTimeoutMillis; this.corsEnabled = corsEnabled; } @@ -70,6 +73,7 @@ public static HttpHandlingSettings fromSettings(Settings settings) { SETTING_HTTP_COMPRESSION_LEVEL.get(settings), SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings), SETTING_PIPELINING_MAX_EVENTS.get(settings), + SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis(), SETTING_CORS_ENABLED.get(settings)); } @@ -109,6 +113,10 @@ public int getPipeliningMaxEvents() { return pipeliningMaxEvents; } + public long getReadTimeoutMillis() { + return readTimeoutMillis; + } + public boolean isCorsEnabled() { return corsEnabled; } diff --git a/server/src/main/java/org/elasticsearch/http/HttpReadTimeoutException.java b/server/src/main/java/org/elasticsearch/http/HttpReadTimeoutException.java new file mode 100644 index 0000000000000..a4f54e92f34a6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/http/HttpReadTimeoutException.java @@ -0,0 +1,31 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.http; + +public class HttpReadTimeoutException extends RuntimeException { + + public HttpReadTimeoutException(long readTimeoutMillis) { + super("http read timeout after " + readTimeoutMillis + "ms"); + + } + + public HttpReadTimeoutException(long readTimeoutMillis, Exception cause) { + super("http read timeout after " + readTimeoutMillis + "ms", cause); + } +} diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 3468d3d30212b..a72b66de52845 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -227,7 +227,17 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui * timestamp, see {@link #absoluteTimeInMillis()}. */ public long relativeTimeInMillis() { - return cachedTimeThread.relativeTimeInMillis(); + return TimeValue.nsecToMSec(relativeTimeInNanos()); + } + + /** + * Returns a value of nanoseconds that may be used for relative time calculations. + * + * This method should only be used for calculating time deltas. For an epoch based + * timestamp, see {@link #absoluteTimeInMillis()}. + */ + public long relativeTimeInNanos() { + return cachedTimeThread.relativeTimeInNanos(); } /** @@ -490,30 +500,29 @@ static class CachedTimeThread extends Thread { final long interval; volatile boolean running = true; - volatile long relativeMillis; + volatile long relativeNanos; volatile long absoluteMillis; CachedTimeThread(String name, long interval) { super(name); this.interval = interval; - this.relativeMillis = TimeValue.nsecToMSec(System.nanoTime()); + this.relativeNanos = System.nanoTime(); this.absoluteMillis = System.currentTimeMillis(); setDaemon(true); } /** - * Return the current time used for relative calculations. This is - * {@link System#nanoTime()} truncated to milliseconds. + * Return the current time used for relative calculations. This is {@link System#nanoTime()}. *

* If {@link ThreadPool#ESTIMATED_TIME_INTERVAL_SETTING} is set to 0 * then the cache is disabled and the method calls {@link System#nanoTime()} * whenever called. Typically used for testing. */ - long relativeTimeInMillis() { + long relativeTimeInNanos() { if (0 < interval) { - return relativeMillis; + return relativeNanos; } - return TimeValue.nsecToMSec(System.nanoTime()); + return System.nanoTime(); } /** @@ -534,7 +543,7 @@ long absoluteTimeInMillis() { @Override public void run() { while (running && 0 < interval) { - relativeMillis = TimeValue.nsecToMSec(System.nanoTime()); + relativeNanos = System.nanoTime(); absoluteMillis = System.currentTimeMillis(); try { Thread.sleep(interval); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java index 596f9c1e6e107..043af216b8f35 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java @@ -54,7 +54,7 @@ public SecurityNetty4HttpServerTransport(Settings settings, NetworkService netwo } @Override - protected void onException(HttpChannel channel, Exception e) { + public void onException(HttpChannel channel, Exception e) { securityExceptionHandler.accept(channel, e); } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransport.java index 8ecba16fa460d..b65f29eb95100 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransport.java @@ -98,7 +98,7 @@ public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel) return new Page(ByteBuffer.wrap(bytes.v()), bytes::close); }; HttpReadWriteHandler httpHandler = new HttpReadWriteHandler(httpChannel,SecurityNioHttpServerTransport.this, - handlingSettings, corsConfig); + handlingSettings, corsConfig, selector.getTaskScheduler(), threadPool::relativeTimeInNanos); InboundChannelBuffer buffer = new InboundChannelBuffer(pageSupplier); Consumer exceptionHandler = (e) -> securityExceptionHandler.accept(httpChannel, e);