From 47f3a4d9eef55ad1439e4e4caab3d58347eb8552 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 6 Dec 2017 16:36:13 -0700 Subject: [PATCH 1/6] WIP --- .../transport/nio/ByteBufferProvider.java | 52 +++++++++++++++++++ .../transport/nio/InboundChannelBuffer.java | 36 ++++++++----- .../transport/nio/NioTransport.java | 16 +++++- .../nio/channel/NioSocketChannel.java | 1 + .../transport/nio/channel/ReadContext.java | 5 +- .../transport/nio/channel/TcpReadContext.java | 21 ++++---- .../nio/channel/TcpReadContextTests.java | 14 ++++- 7 files changed, 117 insertions(+), 28 deletions(-) create mode 100644 test/framework/src/main/java/org/elasticsearch/transport/nio/ByteBufferProvider.java diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/ByteBufferProvider.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/ByteBufferProvider.java new file mode 100644 index 0000000000000..1b59f68c5f753 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/ByteBufferProvider.java @@ -0,0 +1,52 @@ +package org.elasticsearch.transport.nio; + +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.recycler.Recycler; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; + +import java.nio.ByteBuffer; + +public class ByteBufferProvider { + + public static final ByteBufferProvider NON_RECYCLING_INSTANCE = new ByteBufferProvider(); + + private final PageCacheRecycler pageCacheRecycler; + + private ByteBufferProvider() { + this(null); + } + + public ByteBufferProvider(PageCacheRecycler pageCacheRecycler) { + this.pageCacheRecycler = pageCacheRecycler; + } + + public ReleasableByteBuffer getByteBufferPage() { + if (pageCacheRecycler != null) { + Recycler.V bytePage = pageCacheRecycler.bytePage(false); + return new ReleasableByteBuffer(ByteBuffer.wrap(bytePage.v()), bytePage); + } else { + return new ReleasableByteBuffer(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {}); + } + } + + static class ReleasableByteBuffer implements Releasable { + + private final ByteBuffer byteBuffer; + private final Releasable releasable; + + private ReleasableByteBuffer(ByteBuffer byteBuffer, Releasable releasable) { + this.byteBuffer = byteBuffer; + this.releasable = releasable; + } + + ByteBuffer byteBuffer() { + return byteBuffer; + } + + @Override + public void close() { + releasable.close(); + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java index 46cec52bb6c32..cad503b1c9dfb 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java @@ -19,10 +19,11 @@ package org.elasticsearch.transport.nio; +import org.elasticsearch.common.lease.Releasable; + import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.Iterator; -import java.util.function.Supplier; /** * This is a channel byte buffer composed internally of 16kb pages. When an entire message has been read @@ -30,7 +31,7 @@ * the pages internally. If more space is needed at the end of the buffer {@link #ensureCapacity(long)} can * be called and the buffer will expand using the supplier provided. */ -public final class InboundChannelBuffer { +public final class InboundChannelBuffer implements Releasable { private static final int PAGE_SIZE = 1 << 14; private static final int PAGE_MASK = PAGE_SIZE - 1; @@ -38,8 +39,8 @@ public final class InboundChannelBuffer { private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0]; - private final ArrayDeque pages; - private final Supplier pageSupplier; + private final ArrayDeque pages; + private final ByteBufferProvider pageSupplier; private long capacity = 0; private long internalIndex = 0; @@ -47,22 +48,31 @@ public final class InboundChannelBuffer { private int offset = 0; public InboundChannelBuffer() { - this(() -> ByteBuffer.wrap(new byte[PAGE_SIZE])); + this(ByteBufferProvider.NON_RECYCLING_INSTANCE); } - private InboundChannelBuffer(Supplier pageSupplier) { + public InboundChannelBuffer(ByteBufferProvider pageSupplier) { this.pageSupplier = pageSupplier; this.pages = new ArrayDeque<>(); this.capacity = PAGE_SIZE * pages.size(); ensureCapacity(PAGE_SIZE); } + @Override + public void close() { + ByteBufferProvider.ReleasableByteBuffer buffer; + while ((buffer = pages.pollFirst()) != null) { + buffer.close(); + } + } + public void ensureCapacity(long requiredCapacity) { if (capacity < requiredCapacity) { int numPages = numPages(requiredCapacity + offset); int pagesToAdd = numPages - pages.size(); for (int i = 0; i < pagesToAdd; i++) { - pages.addLast(pageSupplier.get()); + ByteBufferProvider.ReleasableByteBuffer page = pageSupplier.getByteBufferPage(); + pages.addLast(page); } capacity += pagesToAdd * PAGE_SIZE; } @@ -112,12 +122,12 @@ public ByteBuffer[] sliceBuffersTo(long to) { } ByteBuffer[] buffers = new ByteBuffer[pageCount]; - Iterator pageIterator = pages.iterator(); - ByteBuffer firstBuffer = pageIterator.next().duplicate(); + Iterator pageIterator = pages.iterator(); + ByteBuffer firstBuffer = pageIterator.next().byteBuffer().duplicate(); firstBuffer.position(firstBuffer.position() + offset); buffers[0] = firstBuffer; for (int i = 1; i < buffers.length; i++) { - buffers[i] = pageIterator.next().duplicate(); + buffers[i] = pageIterator.next().byteBuffer().duplicate(); } if (finalLimit != 0) { buffers[buffers.length - 1].limit(finalLimit); @@ -148,11 +158,11 @@ public ByteBuffer[] sliceBuffersFrom(long from) { int indexInPage = indexInPage(indexWithOffset); ByteBuffer[] buffers = new ByteBuffer[pages.size() - pageIndex]; - Iterator pageIterator = pages.descendingIterator(); + Iterator pageIterator = pages.descendingIterator(); for (int i = buffers.length - 1; i > 0; --i) { - buffers[i] = pageIterator.next().duplicate(); + buffers[i] = pageIterator.next().byteBuffer().duplicate(); } - ByteBuffer firstPostIndexBuffer = pageIterator.next().duplicate(); + ByteBuffer firstPostIndexBuffer = pageIterator.next().byteBuffer().duplicate(); firstPostIndexBuffer.position(firstPostIndexBuffer.position() + indexInPage); buffers[0] = firstPostIndexBuffer; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index d1ab10fb5683a..b1a5e5f4ab1e0 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -69,6 +69,7 @@ public class NioTransport extends TcpTransport { private final OpenChannels openChannels = new OpenChannels(logger); private final ConcurrentMap profileToChannelFactory = newConcurrentMap(); + private final ByteBufferProvider byteBufferProvider; private final ArrayList acceptors = new ArrayList<>(); private final ArrayList socketSelectors = new ArrayList<>(); private RoundRobinSelectorSupplier clientSelectorSupplier; @@ -77,7 +78,15 @@ public class NioTransport extends TcpTransport { public NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { + this(settings, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService, + ByteBufferProvider.NON_RECYCLING_INSTANCE); + } + + public NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, + NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService, + ByteBufferProvider byteBufferProvider) { super("nio", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService); + this.byteBufferProvider = byteBufferProvider; } @Override @@ -184,8 +193,11 @@ final void exceptionCaught(NioSocketChannel channel, Exception exception) { } private Consumer getContextSetter(String profileName) { - return (c) -> c.setContexts(new TcpReadContext(c, new TcpReadHandler(profileName,this)), new TcpWriteContext(c), - this::exceptionCaught); + return (c) -> { + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(byteBufferProvider); + c.setContexts(new TcpReadContext(c, new TcpReadHandler(profileName, this), channelBuffer), + new TcpWriteContext(c), this::exceptionCaught); + }; } private void acceptChannel(NioSocketChannel channel) { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java index 0f6c671508815..06f8aec627936 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java @@ -56,6 +56,7 @@ public void closeFromSelector() throws IOException { if (writeContext.hasQueuedWriteOps()) { writeContext.clearQueuedWriteOps(new ClosedChannelException()); } + readContext.close(); super.closeFromSelector(); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ReadContext.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ReadContext.java index 9d2919b19286a..243a6d8b239c2 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ReadContext.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ReadContext.java @@ -21,8 +21,11 @@ import java.io.IOException; -public interface ReadContext { +public interface ReadContext extends AutoCloseable { int read() throws IOException; + @Override + void close(); + } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java index ae9fe0fdc933e..95a00403deb16 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.bytes.ByteBufferReference; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; +import org.elasticsearch.transport.nio.ByteBufferProvider; import org.elasticsearch.transport.nio.InboundChannelBuffer; import org.elasticsearch.transport.nio.TcpReadHandler; @@ -32,17 +33,13 @@ public class TcpReadContext implements ReadContext { private final TcpReadHandler handler; private final TcpNioSocketChannel channel; - private final TcpFrameDecoder frameDecoder; - private final InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); + private final InboundChannelBuffer channelBuffer; + private final TcpFrameDecoder frameDecoder = new TcpFrameDecoder(); - public TcpReadContext(NioSocketChannel channel, TcpReadHandler handler) { - this((TcpNioSocketChannel) channel, handler, new TcpFrameDecoder()); - } - - public TcpReadContext(TcpNioSocketChannel channel, TcpReadHandler handler, TcpFrameDecoder frameDecoder) { + public TcpReadContext(NioSocketChannel channel, TcpReadHandler handler, InboundChannelBuffer channelBuffer) { this.handler = handler; - this.channel = channel; - this.frameDecoder = frameDecoder; + this.channel = (TcpNioSocketChannel) channel; + this.channelBuffer = channelBuffer; } @Override @@ -82,6 +79,11 @@ public int read() throws IOException { return bytesRead; } + @Override + public void close() { + channelBuffer.close(); + } + private static BytesReference toBytesReference(InboundChannelBuffer channelBuffer) { ByteBuffer[] writtenToBuffers = channelBuffer.sliceBuffersTo(channelBuffer.getIndex()); ByteBufferReference[] references = new ByteBufferReference[writtenToBuffers.length]; @@ -91,5 +93,4 @@ private static BytesReference toBytesReference(InboundChannelBuffer channelBuffe return new CompositeBytesReference(references); } - } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java index 73583353f73db..8da5f65a772d2 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.nio.ByteBufferProvider; import org.elasticsearch.transport.nio.InboundChannelBuffer; import org.elasticsearch.transport.nio.TcpReadHandler; import org.junit.Before; @@ -45,12 +46,12 @@ public class TcpReadContextTests extends ESTestCase { private TcpReadContext readContext; @Before - public void init() throws IOException { + public void init() { handler = mock(TcpReadHandler.class); messageLength = randomInt(96) + 4; channel = mock(TcpNioSocketChannel.class); - readContext = new TcpReadContext(channel, handler); + readContext = new TcpReadContext(channel, handler, new InboundChannelBuffer()); } public void testSuccessfulRead() throws IOException { @@ -122,6 +123,15 @@ public void testReadThrowsIOException() throws IOException { } } + public void closeClosesChannelBuffer() { + InboundChannelBuffer buffer = mock(InboundChannelBuffer.class); + TcpReadContext readContext = new TcpReadContext(channel, handler, buffer); + + readContext.close(); + + verify(buffer).close(); + } + private static byte[] combineMessageAndHeader(byte[] bytes) { return combineMessageAndHeader(bytes, bytes.length); } From c0cab82a927a109c50c655396ae02ade7e3d1245 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 6 Dec 2017 17:05:30 -0700 Subject: [PATCH 2/6] WIP --- .../transport/nio/ByteBufferProvider.java | 52 ------------------- .../transport/nio/InboundChannelBuffer.java | 42 +++++++++++---- .../transport/nio/NioTransport.java | 15 ++---- .../transport/nio/channel/TcpReadContext.java | 1 - .../nio/channel/TcpReadContextTests.java | 1 - 5 files changed, 36 insertions(+), 75 deletions(-) delete mode 100644 test/framework/src/main/java/org/elasticsearch/transport/nio/ByteBufferProvider.java diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/ByteBufferProvider.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/ByteBufferProvider.java deleted file mode 100644 index 1b59f68c5f753..0000000000000 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/ByteBufferProvider.java +++ /dev/null @@ -1,52 +0,0 @@ -package org.elasticsearch.transport.nio; - -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.recycler.Recycler; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.PageCacheRecycler; - -import java.nio.ByteBuffer; - -public class ByteBufferProvider { - - public static final ByteBufferProvider NON_RECYCLING_INSTANCE = new ByteBufferProvider(); - - private final PageCacheRecycler pageCacheRecycler; - - private ByteBufferProvider() { - this(null); - } - - public ByteBufferProvider(PageCacheRecycler pageCacheRecycler) { - this.pageCacheRecycler = pageCacheRecycler; - } - - public ReleasableByteBuffer getByteBufferPage() { - if (pageCacheRecycler != null) { - Recycler.V bytePage = pageCacheRecycler.bytePage(false); - return new ReleasableByteBuffer(ByteBuffer.wrap(bytePage.v()), bytePage); - } else { - return new ReleasableByteBuffer(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {}); - } - } - - static class ReleasableByteBuffer implements Releasable { - - private final ByteBuffer byteBuffer; - private final Releasable releasable; - - private ReleasableByteBuffer(ByteBuffer byteBuffer, Releasable releasable) { - this.byteBuffer = byteBuffer; - this.releasable = releasable; - } - - ByteBuffer byteBuffer() { - return byteBuffer; - } - - @Override - public void close() { - releasable.close(); - } - } -} diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java index cad503b1c9dfb..5e92a7493c520 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java @@ -20,10 +20,12 @@ package org.elasticsearch.transport.nio; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.util.BigArrays; import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.Iterator; +import java.util.function.Supplier; /** * This is a channel byte buffer composed internally of 16kb pages. When an entire message has been read @@ -39,8 +41,8 @@ public final class InboundChannelBuffer implements Releasable { private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0]; - private final ArrayDeque pages; - private final ByteBufferProvider pageSupplier; + private final ArrayDeque pages; + private final Supplier pageSupplier; private long capacity = 0; private long internalIndex = 0; @@ -48,10 +50,10 @@ public final class InboundChannelBuffer implements Releasable { private int offset = 0; public InboundChannelBuffer() { - this(ByteBufferProvider.NON_RECYCLING_INSTANCE); + this(() -> new Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {})); } - public InboundChannelBuffer(ByteBufferProvider pageSupplier) { + public InboundChannelBuffer(Supplier pageSupplier) { this.pageSupplier = pageSupplier; this.pages = new ArrayDeque<>(); this.capacity = PAGE_SIZE * pages.size(); @@ -60,9 +62,9 @@ public InboundChannelBuffer(ByteBufferProvider pageSupplier) { @Override public void close() { - ByteBufferProvider.ReleasableByteBuffer buffer; - while ((buffer = pages.pollFirst()) != null) { - buffer.close(); + Page page; + while ((page = pages.pollFirst()) != null) { + page.close(); } } @@ -71,7 +73,7 @@ public void ensureCapacity(long requiredCapacity) { int numPages = numPages(requiredCapacity + offset); int pagesToAdd = numPages - pages.size(); for (int i = 0; i < pagesToAdd; i++) { - ByteBufferProvider.ReleasableByteBuffer page = pageSupplier.getByteBufferPage(); + Page page = pageSupplier.get(); pages.addLast(page); } capacity += pagesToAdd * PAGE_SIZE; @@ -122,7 +124,7 @@ public ByteBuffer[] sliceBuffersTo(long to) { } ByteBuffer[] buffers = new ByteBuffer[pageCount]; - Iterator pageIterator = pages.iterator(); + Iterator pageIterator = pages.iterator(); ByteBuffer firstBuffer = pageIterator.next().byteBuffer().duplicate(); firstBuffer.position(firstBuffer.position() + offset); buffers[0] = firstBuffer; @@ -158,7 +160,7 @@ public ByteBuffer[] sliceBuffersFrom(long from) { int indexInPage = indexInPage(indexWithOffset); ByteBuffer[] buffers = new ByteBuffer[pages.size() - pageIndex]; - Iterator pageIterator = pages.descendingIterator(); + Iterator pageIterator = pages.descendingIterator(); for (int i = buffers.length - 1; i > 0; --i) { buffers[i] = pageIterator.next().byteBuffer().duplicate(); } @@ -211,4 +213,24 @@ private int pageIndex(long index) { private int indexInPage(long index) { return (int) (index & PAGE_MASK); } + + static class Page implements Releasable { + + private final ByteBuffer byteBuffer; + private final Releasable releasable; + + Page(ByteBuffer byteBuffer, Releasable releasable) { + this.byteBuffer = byteBuffer; + this.releasable = releasable; + } + + @Override + public void close() { + releasable.close(); + } + + private ByteBuffer byteBuffer() { + return byteBuffer; + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index b1a5e5f4ab1e0..e02ef72a5b287 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -44,6 +44,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadFactory; @@ -69,7 +70,6 @@ public class NioTransport extends TcpTransport { private final OpenChannels openChannels = new OpenChannels(logger); private final ConcurrentMap profileToChannelFactory = newConcurrentMap(); - private final ByteBufferProvider byteBufferProvider; private final ArrayList acceptors = new ArrayList<>(); private final ArrayList socketSelectors = new ArrayList<>(); private RoundRobinSelectorSupplier clientSelectorSupplier; @@ -78,15 +78,7 @@ public class NioTransport extends TcpTransport { public NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { - this(settings, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService, - ByteBufferProvider.NON_RECYCLING_INSTANCE); - } - - public NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, - NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService, - ByteBufferProvider byteBufferProvider) { super("nio", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService); - this.byteBufferProvider = byteBufferProvider; } @Override @@ -194,8 +186,9 @@ final void exceptionCaught(NioSocketChannel channel, Exception exception) { private Consumer getContextSetter(String profileName) { return (c) -> { - InboundChannelBuffer channelBuffer = new InboundChannelBuffer(byteBufferProvider); - c.setContexts(new TcpReadContext(c, new TcpReadHandler(profileName, this), channelBuffer), + Supplier pageSupplier = () -> + new InboundChannelBuffer.Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {}); + c.setContexts(new TcpReadContext(c, new TcpReadHandler(profileName, this), new InboundChannelBuffer(pageSupplier)), new TcpWriteContext(c), this::exceptionCaught); }; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java index 95a00403deb16..24a671412d220 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java @@ -22,7 +22,6 @@ import org.elasticsearch.common.bytes.ByteBufferReference; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; -import org.elasticsearch.transport.nio.ByteBufferProvider; import org.elasticsearch.transport.nio.InboundChannelBuffer; import org.elasticsearch.transport.nio.TcpReadHandler; diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java index 8da5f65a772d2..ad6db666ac371 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.nio.ByteBufferProvider; import org.elasticsearch.transport.nio.InboundChannelBuffer; import org.elasticsearch.transport.nio.TcpReadHandler; import org.junit.Before; From 23d40d8bc0d32205834bdf67b0d85959fe748347 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 6 Dec 2017 19:48:59 -0700 Subject: [PATCH 3/6] Wip --- .../client/transport/TransportClient.java | 6 ++++-- .../common/network/NetworkModule.java | 6 ++++-- .../elasticsearch/common/util/BigArrays.java | 4 ++-- .../common/util/PageCacheRecycler.java | 6 +++--- .../java/org/elasticsearch/node/Node.java | 19 +++++++++++++++---- .../elasticsearch/plugins/NetworkPlugin.java | 2 ++ .../common/network/NetworkModuleTests.java | 8 ++++++-- .../elasticsearch/index/IndexModuleTests.java | 4 +++- .../transport/TcpTransportTests.java | 3 ++- .../elasticsearch/transport/Netty4Plugin.java | 2 ++ .../transport/netty4/Netty4TransportIT.java | 2 ++ .../common/util/MockPageCacheRecycler.java | 2 +- .../java/org/elasticsearch/node/MockNode.java | 14 ++++++++++++-- .../elasticsearch/test/ESIntegTestCase.java | 2 +- .../org/elasticsearch/test/ESTestCase.java | 8 +++++--- .../transport/MockTcpTransportPlugin.java | 2 ++ .../transport/nio/InboundChannelBuffer.java | 2 +- .../transport/nio/NioTransport.java | 13 ++++++++++--- .../transport/nio/NioTransportPlugin.java | 5 ++++- .../transport/nio/WriteOperation.java | 4 ++-- .../nio/SimpleNioTransportTests.java | 6 ++++-- 21 files changed, 87 insertions(+), 33 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/core/src/main/java/org/elasticsearch/client/transport/TransportClient.java index 182c253221483..0d4a87f3bfa24 100644 --- a/core/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/core/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -43,6 +43,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.node.InternalSettingsPreparer; @@ -169,11 +170,12 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(), settingsModule.getClusterSettings()); resourcesToClose.add(circuitBreakerService); - BigArrays bigArrays = new BigArrays(settings, circuitBreakerService); + PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings); + BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService); resourcesToClose.add(bigArrays); modules.add(settingsModule); NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool, - bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null); + bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null); final Transport transport = networkModule.getTransportSupplier().get(); final TransportService transportService = new TransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), diff --git a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java index 8cb13647fb6af..2b9cde65d64cc 100644 --- a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java +++ b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.http.HttpServerTransport; @@ -107,6 +108,7 @@ public final class NetworkModule { */ public NetworkModule(Settings settings, boolean transportClient, List plugins, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, @@ -121,9 +123,9 @@ public NetworkModule(Settings settings, boolean transportClient, List> httpTransportFactory = plugin.getTransports(settings, threadPool, bigArrays, + Map> transportFactory = plugin.getTransports(settings, threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); - for (Map.Entry> entry : httpTransportFactory.entrySet()) { + for (Map.Entry> entry : transportFactory.entrySet()) { registerTransport(entry.getKey(), entry.getValue()); } List transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry, diff --git a/core/src/main/java/org/elasticsearch/common/util/BigArrays.java b/core/src/main/java/org/elasticsearch/common/util/BigArrays.java index 5c539a791cf6b..ec46a2b937f40 100644 --- a/core/src/main/java/org/elasticsearch/common/util/BigArrays.java +++ b/core/src/main/java/org/elasticsearch/common/util/BigArrays.java @@ -372,9 +372,9 @@ public T set(long index, T value) { final boolean checkBreaker; private final BigArrays circuitBreakingInstance; - public BigArrays(Settings settings, @Nullable final CircuitBreakerService breakerService) { + public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService) { // Checking the breaker is disabled if not specified - this(new PageCacheRecycler(settings), breakerService, false); + this(recycler, breakerService, false); } // public for tests diff --git a/core/src/main/java/org/elasticsearch/common/util/PageCacheRecycler.java b/core/src/main/java/org/elasticsearch/common/util/PageCacheRecycler.java index f40938b8ec0c8..be69cf5b95a06 100644 --- a/core/src/main/java/org/elasticsearch/common/util/PageCacheRecycler.java +++ b/core/src/main/java/org/elasticsearch/common/util/PageCacheRecycler.java @@ -65,10 +65,10 @@ public void close() { Releasables.close(true, bytePage, intPage, longPage, objectPage); } - protected PageCacheRecycler(Settings settings) { + public PageCacheRecycler(Settings settings) { super(settings); - final Type type = TYPE_SETTING .get(settings); - final long limit = LIMIT_HEAP_SETTING .get(settings).getBytes(); + final Type type = TYPE_SETTING.get(settings); + final long limit = LIMIT_HEAP_SETTING.get(settings).getBytes(); final int availableProcessors = EsExecutors.numberOfProcessors(settings); // We have a global amount of memory that we need to divide across data types. diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index a4b7e5147d5e8..8372f985df0c3 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -80,6 +80,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryModule; @@ -363,7 +364,8 @@ protected Node(final Environment environment, Collection modules.add(new GatewayModule()); - BigArrays bigArrays = createBigArrays(settings, circuitBreakerService); + PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings); + BigArrays bigArrays = createBigArrays(settings, pageCacheRecycler, circuitBreakerService); resourcesToClose.add(bigArrays); modules.add(settingsModule); List namedWriteables = Stream.of( @@ -403,7 +405,8 @@ protected Node(final Environment environment, Collection final RestController restController = actionModule.getRestController(); final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class), - threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, restController); + threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry, + networkService, restController); Collection>> customMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class).stream() .map(Plugin::getCustomMetaDataUpgrader) @@ -898,8 +901,16 @@ public static CircuitBreakerService createCircuitBreakerService(Settings setting * Creates a new {@link BigArrays} instance used for this node. * This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing */ - BigArrays createBigArrays(Settings settings, CircuitBreakerService circuitBreakerService) { - return new BigArrays(settings, circuitBreakerService); + BigArrays createBigArrays(Settings settings, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService) { + return new BigArrays(pageCacheRecycler, circuitBreakerService); + } + + /** + * Creates a new {@link BigArrays} instance used for this node. + * This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing + */ + PageCacheRecycler createPageCacheRecycler(Settings settings) { + return new PageCacheRecycler(settings); } /** diff --git a/core/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java b/core/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java index 33fab61c24a79..df41036ffeabb 100644 --- a/core/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java +++ b/core/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.http.HttpServerTransport; @@ -58,6 +59,7 @@ default List getTransportInterceptors(NamedWriteableRegist * See {@link org.elasticsearch.common.network.NetworkModule#TRANSPORT_TYPE_KEY} to configure a specific implementation. */ default Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { diff --git a/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java b/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java index 81e685e68529e..70deb8a4ba88e 100644 --- a/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java +++ b/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.http.HttpInfo; @@ -133,6 +134,7 @@ public void testRegisterTransport() { NetworkPlugin plugin = new NetworkPlugin() { @Override public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { @@ -193,6 +195,7 @@ public void testOverrideDefault() { NetworkModule module = newNetworkModule(settings, false, new NetworkPlugin() { @Override public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { @@ -227,6 +230,7 @@ public void testDefaultKeys() { NetworkModule module = newNetworkModule(settings, false, new NetworkPlugin() { @Override public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { @@ -306,7 +310,7 @@ public List getTransportInterceptors(NamedWriteableRegistr } private NetworkModule newNetworkModule(Settings settings, boolean transportClient, NetworkPlugin... plugins) { - return new NetworkModule(settings, transportClient, Arrays.asList(plugins), threadPool, null, null, null, xContentRegistry(), null, - new NullDispatcher()); + return new NetworkModule(settings, transportClient, Arrays.asList(plugins), threadPool, null, null, null, null, + xContentRegistry(), null, new NullDispatcher()); } } diff --git a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java index f1037d67ff4aa..c7d396c778a3d 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; @@ -124,7 +125,8 @@ public void setUp() throws Exception { emptyMap(), emptyMap(), emptyMap()); threadPool = new TestThreadPool("test"); circuitBreakerService = new NoneCircuitBreakerService(); - bigArrays = new BigArrays(settings, circuitBreakerService); + PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings); + bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService); scriptService = new ScriptService(settings, Collections.emptyMap(), Collections.emptyMap()); clusterService = ClusterServiceUtils.createClusterService(threadPool); nodeEnvironment = new NodeEnvironment(settings, environment); diff --git a/core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 5182951a0fd53..56fe7a5ebecd0 100644 --- a/core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.TestThreadPool; @@ -177,7 +178,7 @@ public void testCompressRequest() throws IOException { try { TcpTransport transport = new TcpTransport( "test", Settings.builder().put("transport.tcp.compress", compressed).build(), threadPool, - new BigArrays(Settings.EMPTY, null), null, null, null) { + new BigArrays(new PageCacheRecycler(Settings.EMPTY), null), null, null, null) { @Override protected FakeChannel bind(String name, InetSocketAddress address) throws IOException { diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java index 4c842d5a4dca7..c6655b58bc3bd 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.http.netty4.Netty4HttpServerTransport; @@ -76,6 +77,7 @@ public Settings additionalSettings() { @Override public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java index 04a2b8131f9ee..b81c8efcb47ee 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.plugins.NetworkPlugin; import org.elasticsearch.plugins.Plugin; @@ -90,6 +91,7 @@ public static class TestPlugin extends Plugin implements NetworkPlugin { @Override public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { diff --git a/test/framework/src/main/java/org/elasticsearch/common/util/MockPageCacheRecycler.java b/test/framework/src/main/java/org/elasticsearch/common/util/MockPageCacheRecycler.java index 766d68f45dd00..5fcf2f11d0ed0 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/util/MockPageCacheRecycler.java +++ b/test/framework/src/main/java/org/elasticsearch/common/util/MockPageCacheRecycler.java @@ -57,7 +57,7 @@ public static void ensureAllPagesAreReleased() throws Exception { private final Random random; - MockPageCacheRecycler(Settings settings) { + public MockPageCacheRecycler(Settings settings) { super(settings); // we always initialize with 0 here since we really only wanna have some random bytes / ints / longs // and given the fact that it's called concurrently it won't reproduces anyway the same order other than in a unittest diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index 34ed8d337e800..ec6645c9a2eae 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -30,6 +30,8 @@ import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.env.Environment; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -82,13 +84,21 @@ public Collection> getClasspathPlugins() { } @Override - protected BigArrays createBigArrays(Settings settings, CircuitBreakerService circuitBreakerService) { + protected BigArrays createBigArrays(Settings settings, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService) { if (getPluginsService().filterPlugins(NodeMocksPlugin.class).isEmpty()) { - return super.createBigArrays(settings, circuitBreakerService); + return super.createBigArrays(settings, pageCacheRecycler, circuitBreakerService); } return new MockBigArrays(settings, circuitBreakerService); } + @Override + PageCacheRecycler createPageCacheRecycler(Settings settings) { + if (getPluginsService().filterPlugins(NodeMocksPlugin.class).isEmpty()) { + return super.createPageCacheRecycler(settings); + } + return new MockPageCacheRecycler(settings); + } + @Override protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService, diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 1dbf40155461b..be6a1b29681da 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -2076,7 +2076,7 @@ public static void afterClass() throws Exception { try { INSTANCE.printTestMessage("cleaning up after"); INSTANCE.afterInternal(true); - checkStaticState(); + checkStaticState(true); } finally { INSTANCE = null; } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index e10411e5a435e..01ad6763a8cb6 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -288,7 +288,7 @@ protected boolean enableWarningsCheck() { @After public final void after() throws Exception { - checkStaticState(); + checkStaticState(false); // We check threadContext != null rather than enableWarningsCheck() // because after methods are still called in the event that before // methods failed, in which case threadContext might not have been @@ -394,8 +394,10 @@ public void log(StatusData data) { } // separate method so that this can be checked again after suite scoped cluster is shut down - protected static void checkStaticState() throws Exception { - MockPageCacheRecycler.ensureAllPagesAreReleased(); + protected static void checkStaticState(boolean afterClass) throws Exception { + if (afterClass) { + MockPageCacheRecycler.ensureAllPagesAreReleased(); + } MockBigArrays.ensureAllArraysAreReleased(); // ensure no one changed the status logger level on us diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransportPlugin.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransportPlugin.java index 25c356ebf4d1f..c85fee985feee 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransportPlugin.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransportPlugin.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.plugins.NetworkPlugin; import org.elasticsearch.plugins.Plugin; @@ -39,6 +40,7 @@ public class MockTcpTransportPlugin extends Plugin implements NetworkPlugin { @Override public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java index 5e92a7493c520..51c3ff53aee29 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java @@ -93,7 +93,7 @@ public void release(long bytesToRelease) { int pagesToRelease = pageIndex(offset + bytesToRelease); for (int i = 0; i < pagesToRelease; i++) { - pages.removeFirst(); + pages.removeFirst().close(); } capacity -= bytesToRelease; internalIndex = Math.max(internalIndex - bytesToRelease, 0); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index e02ef72a5b287..c39e4517b8b58 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -24,10 +24,12 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; @@ -69,6 +71,7 @@ public class NioTransport extends TcpTransport { intSetting("transport.nio.acceptor_count", 1, 1, Setting.Property.NodeScope); private final OpenChannels openChannels = new OpenChannels(logger); + private final PageCacheRecycler pageCacheRecycler; private final ConcurrentMap profileToChannelFactory = newConcurrentMap(); private final ArrayList acceptors = new ArrayList<>(); private final ArrayList socketSelectors = new ArrayList<>(); @@ -77,8 +80,10 @@ public class NioTransport extends TcpTransport { private int acceptorNumber; public NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, - NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { + PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, + CircuitBreakerService circuitBreakerService) { super("nio", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService); + this.pageCacheRecycler = pageCacheRecycler; } @Override @@ -186,8 +191,10 @@ final void exceptionCaught(NioSocketChannel channel, Exception exception) { private Consumer getContextSetter(String profileName) { return (c) -> { - Supplier pageSupplier = () -> - new InboundChannelBuffer.Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {}); + Supplier pageSupplier = () -> { + Recycler.V bytes = pageCacheRecycler.bytePage(false); + return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes); + }; c.setContexts(new TcpReadContext(c, new TcpReadHandler(profileName, this), new InboundChannelBuffer(pageSupplier)), new TcpWriteContext(c), this::exceptionCaught); }; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java index 733351fe429d3..e158fe6fe97c3 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.plugins.NetworkPlugin; import org.elasticsearch.plugins.Plugin; @@ -38,6 +39,7 @@ public class NioTransportPlugin extends Plugin implements NetworkPlugin { @Override public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { @@ -49,6 +51,7 @@ public Map> getTransports(Settings settings, ThreadP settings1 = settings; } return Collections.singletonMap(NIO_TRANSPORT_NAME, - () -> new NioTransport(settings1, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService)); + () -> new NioTransport(settings1, threadPool, networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry, + circuitBreakerService)); } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java index 0abb6a6765046..1b2f2cfede4f0 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java @@ -81,9 +81,9 @@ private ByteBuffer[] getBuffersToWrite() { ByteBuffer[] postIndexBuffers = new ByteBuffer[buffers.length - offsetIndex]; - ByteBuffer firstBuffer = buffers[0].duplicate(); + ByteBuffer firstBuffer = buffers[offsetIndex].duplicate(); firstBuffer.position(internalIndex - offsets[offsetIndex]); - postIndexBuffers[offsetIndex] = firstBuffer; + postIndexBuffers[0] = firstBuffer; int j = 1; for (int i = (offsetIndex + 1); i < buffers.length; ++i) { postIndexBuffers[j++] = buffers[i].duplicate(); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java index 55bca45d1c81f..1cff80dec793f 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java @@ -28,6 +28,8 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.node.Node; import org.elasticsearch.test.transport.MockTransportService; @@ -57,8 +59,8 @@ public static MockTransportService nioFromThreadPool(Settings settings, ThreadPo NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); NetworkService networkService = new NetworkService(Collections.emptyList()); Transport transport = new NioTransport(settings, threadPool, - networkService, - BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) { + networkService, BigArrays.NON_RECYCLING_INSTANCE, new MockPageCacheRecycler(settings), namedWriteableRegistry, + new NoneCircuitBreakerService()) { @Override protected Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) throws IOException, From 82042414b72fd7b4d33ee22c6b7dc5fd609f2f5d Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 6 Dec 2017 21:55:49 -0700 Subject: [PATCH 4/6] Fix checkstyle and ctor of mockbigarrays --- core/src/main/java/org/elasticsearch/node/Node.java | 4 ++-- .../common/io/stream/ReleasableBytesStreamOutputTests.java | 3 ++- .../java/org/elasticsearch/common/util/BigArraysTests.java | 2 +- .../org/elasticsearch/common/util/BytesRefHashTests.java | 2 +- .../java/org/elasticsearch/common/util/LongHashTests.java | 2 +- .../elasticsearch/common/util/LongObjectHashMapTests.java | 2 +- .../org/elasticsearch/search/DefaultSearchContextTests.java | 3 ++- .../aggregations/MultiBucketAggregatorWrapperTests.java | 3 ++- .../bucket/sampler/BestDocsDeferringCollectorTests.java | 5 +++-- .../aggregations/bucket/terms/TermsAggregatorTests.java | 3 ++- .../metrics/cardinality/InternalCardinalityTests.java | 5 +++-- .../aggregations/matrix/stats/InternalMatrixStatsTests.java | 3 ++- .../elasticsearch/http/netty4/Netty4HttpChannelTests.java | 3 ++- .../http/netty4/Netty4HttpServerPipeliningTests.java | 3 ++- .../http/netty4/Netty4HttpServerTransportTests.java | 3 ++- .../transport/netty4/Netty4SizeHeaderFrameDecoderTests.java | 3 ++- .../transport/netty4/NettyTransportMultiPortTests.java | 3 ++- .../java/org/elasticsearch/common/util/MockBigArrays.java | 4 ++-- .../src/main/java/org/elasticsearch/node/MockNode.java | 6 +++--- .../search/aggregations/AggregatorTestCase.java | 3 ++- .../org/elasticsearch/test/InternalAggregationTestCase.java | 3 ++- 21 files changed, 41 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 8372f985df0c3..2a677d00afe46 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -365,7 +365,7 @@ protected Node(final Environment environment, Collection PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings); - BigArrays bigArrays = createBigArrays(settings, pageCacheRecycler, circuitBreakerService); + BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService); resourcesToClose.add(bigArrays); modules.add(settingsModule); List namedWriteables = Stream.of( @@ -901,7 +901,7 @@ public static CircuitBreakerService createCircuitBreakerService(Settings setting * Creates a new {@link BigArrays} instance used for this node. * This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing */ - BigArrays createBigArrays(Settings settings, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService) { + BigArrays createBigArrays(PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService) { return new BigArrays(pageCacheRecycler, circuitBreakerService); } diff --git a/core/src/test/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutputTests.java b/core/src/test/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutputTests.java index 557721a0241ab..5a85310adcdf6 100644 --- a/core/src/test/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutputTests.java +++ b/core/src/test/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutputTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; @@ -30,7 +31,7 @@ public class ReleasableBytesStreamOutputTests extends ESTestCase { public void testRelease() throws Exception { MockBigArrays mockBigArrays = - new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()); + new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); try (ReleasableBytesStreamOutput output = getRandomReleasableBytesStreamOutput(mockBigArrays)) { output.writeBoolean(randomBoolean()); diff --git a/core/src/test/java/org/elasticsearch/common/util/BigArraysTests.java b/core/src/test/java/org/elasticsearch/common/util/BigArraysTests.java index 945dda446ce3a..f280b9a80bf4b 100644 --- a/core/src/test/java/org/elasticsearch/common/util/BigArraysTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/BigArraysTests.java @@ -42,7 +42,7 @@ public class BigArraysTests extends ESTestCase { private BigArrays randombigArrays() { - return new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()); + return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); } private BigArrays bigArrays; diff --git a/core/src/test/java/org/elasticsearch/common/util/BytesRefHashTests.java b/core/src/test/java/org/elasticsearch/common/util/BytesRefHashTests.java index 78abed0b32050..5370815926c68 100644 --- a/core/src/test/java/org/elasticsearch/common/util/BytesRefHashTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/BytesRefHashTests.java @@ -41,7 +41,7 @@ public class BytesRefHashTests extends ESSingleNodeTestCase { BytesRefHash hash; private BigArrays randombigArrays() { - return new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()); + return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); } private void newHash() { diff --git a/core/src/test/java/org/elasticsearch/common/util/LongHashTests.java b/core/src/test/java/org/elasticsearch/common/util/LongHashTests.java index 66c9e689c9634..708e7beb861ec 100644 --- a/core/src/test/java/org/elasticsearch/common/util/LongHashTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/LongHashTests.java @@ -36,7 +36,7 @@ public class LongHashTests extends ESSingleNodeTestCase { LongHash hash; private BigArrays randombigArrays() { - return new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()); + return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); } private void newHash() { diff --git a/core/src/test/java/org/elasticsearch/common/util/LongObjectHashMapTests.java b/core/src/test/java/org/elasticsearch/common/util/LongObjectHashMapTests.java index f783317808d04..9210565a10482 100644 --- a/core/src/test/java/org/elasticsearch/common/util/LongObjectHashMapTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/LongObjectHashMapTests.java @@ -27,7 +27,7 @@ public class LongObjectHashMapTests extends ESSingleNodeTestCase { private BigArrays randombigArrays() { - return new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()); + return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); } public void testDuel() { diff --git a/core/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java b/core/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java index 539bdef17d3de..ec422435e4e07 100644 --- a/core/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java +++ b/core/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.cache.IndexCache; @@ -104,7 +105,7 @@ public void testPreProcess() throws Exception { when(indexService.getIndexSettings()).thenReturn(indexSettings); when(mapperService.getIndexSettings()).thenReturn(indexSettings); - BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()); + BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir); diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/MultiBucketAggregatorWrapperTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/MultiBucketAggregatorWrapperTests.java index 0a83b2ec5c794..9c2ae31af14f8 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/MultiBucketAggregatorWrapperTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/MultiBucketAggregatorWrapperTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.test.ESTestCase; @@ -46,7 +47,7 @@ public class MultiBucketAggregatorWrapperTests extends ESTestCase { public void testNoNullScorerIsDelegated() throws Exception { LeafReaderContext leafReaderContext = MemoryIndex.fromDocument(Collections.emptyList(), new MockAnalyzer(random())) .createSearcher().getIndexReader().leaves().get(0); - BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()); + BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); SearchContext searchContext = mock(SearchContext.class); when(searchContext.bigArrays()).thenReturn(bigArrays); diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/sampler/BestDocsDeferringCollectorTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/sampler/BestDocsDeferringCollectorTests.java index d99f7e9fa73c1..86e937a356b46 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/sampler/BestDocsDeferringCollectorTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/sampler/BestDocsDeferringCollectorTests.java @@ -33,6 +33,7 @@ import org.apache.lucene.store.Directory; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.BucketCollector; @@ -63,8 +64,8 @@ public void testReplay() throws Exception { TermQuery termQuery = new TermQuery(new Term("field", String.valueOf(randomInt(maxNumValues)))); TopDocs topDocs = indexSearcher.search(termQuery, numDocs); - BestDocsDeferringCollector collector = - new BestDocsDeferringCollector(numDocs, new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService())); + BestDocsDeferringCollector collector = new BestDocsDeferringCollector(numDocs, + new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService())); Set deferredCollectedDocIds = new HashSet<>(); collector.setDeferredCollector(Collections.singleton(testCollector(deferredCollectedDocIds))); collector.preCollection(); diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java index 9dd355d8ca054..74cc35da30030 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.IpFieldMapper; import org.elasticsearch.index.mapper.KeywordFieldMapper; @@ -912,7 +913,7 @@ public void testMixLongAndDouble() throws Exception { dir.close(); } InternalAggregation.ReduceContext ctx = - new InternalAggregation.ReduceContext(new MockBigArrays(Settings.EMPTY, + new InternalAggregation.ReduceContext(new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()), null, true); for (InternalAggregation internalAgg : aggs) { InternalAggregation mergedAggs = internalAgg.doReduce(aggs, ctx); diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/metrics/cardinality/InternalCardinalityTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/metrics/cardinality/InternalCardinalityTests.java index 66e9644cb429d..fc1095c857fa4 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/metrics/cardinality/InternalCardinalityTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/metrics/cardinality/InternalCardinalityTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.search.aggregations.ParsedAggregation; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; @@ -60,7 +61,7 @@ public void tearDown() throws Exception { protected InternalCardinality createTestInstance(String name, List pipelineAggregators, Map metaData) { HyperLogLogPlusPlus hllpp = new HyperLogLogPlusPlus(p, - new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()), 1); + new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()), 1); algos.add(hllpp); for (int i = 0; i < 100; i++) { hllpp.collect(0, BitMixer.mix64(randomIntBetween(1, 100))); @@ -107,7 +108,7 @@ protected InternalCardinality mutateInstance(InternalCardinality instance) { break; case 1: HyperLogLogPlusPlus newState = new HyperLogLogPlusPlus(state.precision(), - new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()), 0); + new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()), 0); newState.merge(0, state, 0); int extraValues = between(10, 100); for (int i = 0; i < extraValues; i++) { diff --git a/modules/aggs-matrix-stats/src/test/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStatsTests.java b/modules/aggs-matrix-stats/src/test/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStatsTests.java index 6ff132b32fa29..428889b1f67fd 100644 --- a/modules/aggs-matrix-stats/src/test/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStatsTests.java +++ b/modules/aggs-matrix-stats/src/test/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStatsTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.xcontent.ContextParser; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -160,7 +161,7 @@ public void testReduceRandom() { multiPassStats.computeStats(aValues, bValues); ScriptService mockScriptService = mockScriptService(); - MockBigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()); + MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(bigArrays, mockScriptService, true); InternalMatrixStats reduced = (InternalMatrixStats) shardResults.get(0).reduce(shardResults, context); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java index 4b086ab465001..e9de4ef50a5a4 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java @@ -52,6 +52,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ByteArray; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; @@ -99,7 +100,7 @@ public class Netty4HttpChannelTests extends ESTestCase { public void setup() throws Exception { networkService = new NetworkService(Collections.emptyList()); threadPool = new TestThreadPool("test"); - bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()); + bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); } @After diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java index 4fdd842cb195e..91a5465f6a764 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.http.NullDispatcher; @@ -74,7 +75,7 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase { public void setup() throws Exception { networkService = new NetworkService(Collections.emptyList()); threadPool = new TestThreadPool("test"); - bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()); + bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); } @After diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java index 846c59565c245..36c00b13c2262 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.http.BindHttpException; import org.elasticsearch.http.HttpServerTransport; @@ -93,7 +94,7 @@ public class Netty4HttpServerTransportTests extends ESTestCase { public void setup() throws Exception { networkService = new NetworkService(Collections.emptyList()); threadPool = new TestThreadPool("test"); - bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()); + bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); } @After diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java index 7c56fdc3ab4f4..7343da6c3b11a 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.mocksocket.MockSocket; import org.elasticsearch.test.ESTestCase; @@ -63,7 +64,7 @@ public class Netty4SizeHeaderFrameDecoderTests extends ESTestCase { public void startThreadPool() { threadPool = new ThreadPool(settings); NetworkService networkService = new NetworkService(Collections.emptyList()); - BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()); + BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); nettyTransport = new Netty4Transport(settings, threadPool, networkService, bigArrays, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService()); nettyTransport.start(); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java index cde939bab8dd0..a49df3caaba4e 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -116,7 +117,7 @@ public void testThatDefaultProfilePortOverridesGeneralConfiguration() throws Exc } private TcpTransport startTransport(Settings settings, ThreadPool threadPool) { - BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()); + BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); TcpTransport transport = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()), bigArrays, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService()); transport.start(); diff --git a/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java b/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java index 2f75d92d89829..9eeca0bd12d05 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java +++ b/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java @@ -70,8 +70,8 @@ public static void ensureAllArraysAreReleased() throws Exception { private final PageCacheRecycler recycler; private final CircuitBreakerService breakerService; - public MockBigArrays(Settings settings, CircuitBreakerService breakerService) { - this(new MockPageCacheRecycler(settings), breakerService, false); + public MockBigArrays(PageCacheRecycler recycler, CircuitBreakerService breakerService) { + this(recycler, breakerService, false); } private MockBigArrays(PageCacheRecycler recycler, CircuitBreakerService breakerService, boolean checkBreaker) { diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index ec6645c9a2eae..5a6075b1f8571 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -84,11 +84,11 @@ public Collection> getClasspathPlugins() { } @Override - protected BigArrays createBigArrays(Settings settings, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService) { + protected BigArrays createBigArrays(PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService) { if (getPluginsService().filterPlugins(NodeMocksPlugin.class).isEmpty()) { - return super.createBigArrays(settings, pageCacheRecycler, circuitBreakerService); + return super.createBigArrays(pageCacheRecycler, circuitBreakerService); } - return new MockBigArrays(settings, circuitBreakerService); + return new MockBigArrays(pageCacheRecycler, circuitBreakerService); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 8ae6c7843d2e8..f34b1c6e79f69 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.cache.bitset.BitsetFilterCache; @@ -112,7 +113,7 @@ protected AggregatorFactory createAggregatorFactory(AggregationBuilder aggreg CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService(); when(searchContext.aggregations()) .thenReturn(new SearchContextAggregations(AggregatorFactories.EMPTY, bucketConsumer)); - when(searchContext.bigArrays()).thenReturn(new MockBigArrays(Settings.EMPTY, circuitBreakerService)); + when(searchContext.bigArrays()).thenReturn(new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), circuitBreakerService)); // TODO: now just needed for top_hits, this will need to be revised for other agg unit tests: MapperService mapperService = mapperServiceMock(); when(mapperService.getIndexSettings()).thenReturn(indexSettings); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index 9de83aee63001..ea846c5dd1841 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.xcontent.ContextParser; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ToXContent; @@ -236,7 +237,7 @@ public void testReduceRandom() { toReduce.add(t); } ScriptService mockScriptService = mockScriptService(); - MockBigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()); + MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); if (randomBoolean() && toReduce.size() > 1) { // sometimes do an incremental reduce Collections.shuffle(toReduce, random()); From 5c7e2d0135f5e9122efd8a28b3a7217d6dacec01 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 7 Dec 2017 09:31:27 -0700 Subject: [PATCH 5/6] Changes based on review --- .../transport/nio/InboundChannelBuffer.java | 46 +++++++----- .../nio/InboundChannelBufferTests.java | 74 +++++++++++++++++-- .../nio/channel/TcpReadContextTests.java | 6 +- 3 files changed, 100 insertions(+), 26 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java index 51c3ff53aee29..5816ffb9ace2d 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; /** @@ -35,7 +36,7 @@ */ public final class InboundChannelBuffer implements Releasable { - private static final int PAGE_SIZE = 1 << 14; + private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE; private static final int PAGE_MASK = PAGE_SIZE - 1; private static final int PAGE_SHIFT = Integer.numberOfTrailingZeros(PAGE_SIZE); private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0]; @@ -43,16 +44,13 @@ public final class InboundChannelBuffer implements Releasable { private final ArrayDeque pages; private final Supplier pageSupplier; + private final AtomicBoolean isClosed = new AtomicBoolean(false); private long capacity = 0; private long internalIndex = 0; // The offset is an int as it is the offset of where the bytes begin in the first buffer private int offset = 0; - public InboundChannelBuffer() { - this(() -> new Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {})); - } - public InboundChannelBuffer(Supplier pageSupplier) { this.pageSupplier = pageSupplier; this.pages = new ArrayDeque<>(); @@ -62,13 +60,30 @@ public InboundChannelBuffer(Supplier pageSupplier) { @Override public void close() { - Page page; - while ((page = pages.pollFirst()) != null) { - page.close(); + if (isClosed.compareAndSet(false, true)) { + Page page; + RuntimeException closingException = null; + while ((page = pages.pollFirst()) != null) { + try { + page.close(); + } catch (RuntimeException e) { + if (closingException == null) { + closingException = e; + } else { + closingException.addSuppressed(e); + } + } + } + if (closingException != null) { + throw closingException; + } } } public void ensureCapacity(long requiredCapacity) { + if (isClosed.get()) { + throw new IllegalStateException("Cannot allocate new pages if the buffer is closed."); + } if (capacity < requiredCapacity) { int numPages = numPages(requiredCapacity + offset); int pagesToAdd = numPages - pages.size(); @@ -125,11 +140,11 @@ public ByteBuffer[] sliceBuffersTo(long to) { ByteBuffer[] buffers = new ByteBuffer[pageCount]; Iterator pageIterator = pages.iterator(); - ByteBuffer firstBuffer = pageIterator.next().byteBuffer().duplicate(); + ByteBuffer firstBuffer = pageIterator.next().byteBuffer.duplicate(); firstBuffer.position(firstBuffer.position() + offset); buffers[0] = firstBuffer; for (int i = 1; i < buffers.length; i++) { - buffers[i] = pageIterator.next().byteBuffer().duplicate(); + buffers[i] = pageIterator.next().byteBuffer.duplicate(); } if (finalLimit != 0) { buffers[buffers.length - 1].limit(finalLimit); @@ -162,9 +177,9 @@ public ByteBuffer[] sliceBuffersFrom(long from) { ByteBuffer[] buffers = new ByteBuffer[pages.size() - pageIndex]; Iterator pageIterator = pages.descendingIterator(); for (int i = buffers.length - 1; i > 0; --i) { - buffers[i] = pageIterator.next().byteBuffer().duplicate(); + buffers[i] = pageIterator.next().byteBuffer.duplicate(); } - ByteBuffer firstPostIndexBuffer = pageIterator.next().byteBuffer().duplicate(); + ByteBuffer firstPostIndexBuffer = pageIterator.next().byteBuffer.duplicate(); firstPostIndexBuffer.position(firstPostIndexBuffer.position() + indexInPage); buffers[0] = firstPostIndexBuffer; @@ -214,12 +229,12 @@ private int indexInPage(long index) { return (int) (index & PAGE_MASK); } - static class Page implements Releasable { + public static class Page implements Releasable { private final ByteBuffer byteBuffer; private final Releasable releasable; - Page(ByteBuffer byteBuffer, Releasable releasable) { + public Page(ByteBuffer byteBuffer, Releasable releasable) { this.byteBuffer = byteBuffer; this.releasable = releasable; } @@ -229,8 +244,5 @@ public void close() { releasable.close(); } - private ByteBuffer byteBuffer() { - return byteBuffer; - } } } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java index 7232a93871001..9620f4c9c7660 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java @@ -19,16 +19,22 @@ package org.elasticsearch.transport.nio; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.test.ESTestCase; import java.nio.ByteBuffer; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; public class InboundChannelBufferTests extends ESTestCase { - private static final int PAGE_SIZE = 1 << 14; + private static final int PAGE_SIZE = BigArrays.PAGE_SIZE_IN_BYTES; + private final Supplier defaultPageSupplier = () -> + new InboundChannelBuffer.Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {}); public void testNewBufferHasSinglePage() { - InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier); assertEquals(PAGE_SIZE, channelBuffer.getCapacity()); assertEquals(PAGE_SIZE, channelBuffer.getRemaining()); @@ -36,7 +42,7 @@ public void testNewBufferHasSinglePage() { } public void testExpandCapacity() { - InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier); assertEquals(PAGE_SIZE, channelBuffer.getCapacity()); assertEquals(PAGE_SIZE, channelBuffer.getRemaining()); @@ -48,7 +54,7 @@ public void testExpandCapacity() { } public void testExpandCapacityMultiplePages() { - InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier); assertEquals(PAGE_SIZE, channelBuffer.getCapacity()); @@ -60,7 +66,7 @@ public void testExpandCapacityMultiplePages() { } public void testExpandCapacityRespectsOffset() { - InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier); assertEquals(PAGE_SIZE, channelBuffer.getCapacity()); assertEquals(PAGE_SIZE, channelBuffer.getRemaining()); @@ -79,7 +85,7 @@ public void testExpandCapacityRespectsOffset() { } public void testIncrementIndex() { - InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier); assertEquals(0, channelBuffer.getIndex()); assertEquals(PAGE_SIZE, channelBuffer.getRemaining()); @@ -91,7 +97,7 @@ public void testIncrementIndex() { } public void testIncrementIndexWithOffset() { - InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier); assertEquals(0, channelBuffer.getIndex()); assertEquals(PAGE_SIZE, channelBuffer.getRemaining()); @@ -109,8 +115,60 @@ public void testIncrementIndexWithOffset() { assertEquals(PAGE_SIZE - 20, channelBuffer.getRemaining()); } + public void testReleaseClosesPages() { + ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + Supplier supplier = () -> { + AtomicBoolean atomicBoolean = new AtomicBoolean(); + queue.add(atomicBoolean); + return new InboundChannelBuffer.Page(ByteBuffer.allocate(PAGE_SIZE), () -> atomicBoolean.set(true)); + }; + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(supplier); + channelBuffer.ensureCapacity(PAGE_SIZE * 4); + + assertEquals(PAGE_SIZE * 4, channelBuffer.getCapacity()); + assertEquals(4, queue.size()); + + for (AtomicBoolean closedRef : queue) { + assertFalse(closedRef.get()); + } + + channelBuffer.release(2 * PAGE_SIZE); + + assertEquals(PAGE_SIZE * 2, channelBuffer.getCapacity()); + + assertTrue(queue.poll().get()); + assertTrue(queue.poll().get()); + assertFalse(queue.poll().get()); + assertFalse(queue.poll().get()); + } + + public void testClose() { + ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + Supplier supplier = () -> { + AtomicBoolean atomicBoolean = new AtomicBoolean(); + queue.add(atomicBoolean); + return new InboundChannelBuffer.Page(ByteBuffer.allocate(PAGE_SIZE), () -> atomicBoolean.set(true)); + }; + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(supplier); + channelBuffer.ensureCapacity(PAGE_SIZE * 4); + + assertEquals(4, queue.size()); + + for (AtomicBoolean closedRef : queue) { + assertFalse(closedRef.get()); + } + + channelBuffer.close(); + + for (AtomicBoolean closedRef : queue) { + assertTrue(closedRef.get()); + } + + expectThrows(IllegalStateException.class, () -> channelBuffer.ensureCapacity(1)); + } + public void testAccessByteBuffers() { - InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier); int pages = randomInt(50) + 5; channelBuffer.ensureCapacity(pages * PAGE_SIZE); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java index ad6db666ac371..f24c087e60a30 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.CompositeBytesReference; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.nio.InboundChannelBuffer; import org.elasticsearch.transport.nio.TcpReadHandler; @@ -30,6 +31,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -50,7 +52,9 @@ public void init() { messageLength = randomInt(96) + 4; channel = mock(TcpNioSocketChannel.class); - readContext = new TcpReadContext(channel, handler, new InboundChannelBuffer()); + Supplier pageSupplier = () -> + new InboundChannelBuffer.Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {}); + readContext = new TcpReadContext(channel, handler, new InboundChannelBuffer(pageSupplier)); } public void testSuccessfulRead() throws IOException { From 3846ba594db2beb5ea64784fb25b38256e84abf9 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 8 Dec 2017 09:31:15 -0700 Subject: [PATCH 6/6] Change based on review --- .../transport/nio/InboundChannelBuffer.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java index 5816ffb9ace2d..11d340a30a0c8 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java @@ -19,12 +19,15 @@ package org.elasticsearch.transport.nio; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.util.BigArrays; import java.nio.ByteBuffer; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -62,21 +65,15 @@ public InboundChannelBuffer(Supplier pageSupplier) { public void close() { if (isClosed.compareAndSet(false, true)) { Page page; - RuntimeException closingException = null; + List closingExceptions = new ArrayList<>(); while ((page = pages.pollFirst()) != null) { try { page.close(); } catch (RuntimeException e) { - if (closingException == null) { - closingException = e; - } else { - closingException.addSuppressed(e); - } + closingExceptions.add(e); } } - if (closingException != null) { - throw closingException; - } + ExceptionsHelper.rethrowAndSuppress(closingExceptions); } }