diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java index 0fc135ae49299..5f03d08b4c69b 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.InboundPipeline; import org.elasticsearch.transport.Transports; @@ -53,7 +54,9 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler { Netty4MessageChannelHandler(PageCacheRecycler recycler, Netty4Transport transport) { this.transport = transport; - this.pipeline = new InboundPipeline(transport.getVersion(), recycler, transport::inboundMessage, transport::inboundDecodeException); + final ThreadPool threadPool = transport.getThreadPool(); + this.pipeline = new InboundPipeline(transport.getVersion(), transport.getStatsTracker(), recycler, threadPool::relativeTimeInMillis, + transport::inboundMessage, transport::inboundDecodeException); } @Override diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpReadWriteHandler.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpReadWriteHandler.java index f173a35b97251..bf3473a16ce98 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpReadWriteHandler.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpReadWriteHandler.java @@ -29,6 +29,7 @@ import org.elasticsearch.nio.BytesWriteHandler; import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.Page; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.InboundPipeline; import org.elasticsearch.transport.TcpTransport; @@ -41,7 +42,9 @@ public class TcpReadWriteHandler extends BytesWriteHandler { public TcpReadWriteHandler(NioTcpChannel channel, PageCacheRecycler recycler, TcpTransport transport) { this.channel = channel; - this.pipeline = new InboundPipeline(transport.getVersion(), recycler, transport::inboundMessage, transport::inboundDecodeException); + final ThreadPool threadPool = transport.getThreadPool(); + this.pipeline = new InboundPipeline(transport.getVersion(), transport.getStatsTracker(), recycler, threadPool::relativeTimeInMillis, + transport::inboundMessage, transport::inboundDecodeException); } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index 82175472a60e0..e42ce862682c3 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -28,7 +28,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -45,7 +44,6 @@ public class InboundHandler { private static final Logger logger = LogManager.getLogger(InboundHandler.class); - private final MeanMetric readBytesMetric = new MeanMetric(); private final ThreadPool threadPool; private final OutboundHandler outboundHandler; private final NamedWriteableRegistry namedWriteableRegistry; @@ -83,10 +81,6 @@ final Transport.ResponseHandlers getResponseHandlers() { return responseHandlers; } - MeanMetric getReadBytes() { - return readBytesMetric; - } - void setMessageListener(TransportMessageListener listener) { if (messageListener == TransportMessageListener.NOOP_LISTENER) { messageListener = listener; @@ -100,19 +94,12 @@ void inboundMessage(TcpChannel channel, InboundMessage message) throws Exception TransportLogger.logInboundMessage(channel, message); if (message.isPing()) { - readBytesMetric.inc(TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE); keepAlive.receiveKeepAlive(channel); } else { - readBytesMetric.inc(message.getHeader().getNetworkMessageSize() + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE); messageReceived(message, channel); } } - void handleDecodeException(TcpChannel channel, Header header) { - channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis()); - readBytesMetric.inc(header.getNetworkMessageSize() + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE); - } - private void messageReceived(InboundMessage message, TcpChannel channel) throws IOException { final InetSocketAddress remoteAddress = channel.getRemoteAddress(); final Header header = message.getHeader(); diff --git a/server/src/main/java/org/elasticsearch/transport/InboundPipeline.java b/server/src/main/java/org/elasticsearch/transport/InboundPipeline.java index 5501a339775c5..68740b54742d5 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundPipeline.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundPipeline.java @@ -31,12 +31,15 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.function.BiConsumer; +import java.util.function.LongSupplier; public class InboundPipeline implements Releasable { private static final ThreadLocal> fragmentList = ThreadLocal.withInitial(ArrayList::new); private static final InboundMessage PING_MESSAGE = new InboundMessage(null, true); + private final LongSupplier relativeTimeInMillis; + private final StatsTracker statsTracker; private final InboundDecoder decoder; private final InboundAggregator aggregator; private final BiConsumer messageHandler; @@ -44,14 +47,18 @@ public class InboundPipeline implements Releasable { private ArrayDeque pending = new ArrayDeque<>(2); private boolean isClosed = false; - public InboundPipeline(Version version, PageCacheRecycler recycler, BiConsumer messageHandler, + public InboundPipeline(Version version, StatsTracker statsTracker, PageCacheRecycler recycler, LongSupplier relativeTimeInMillis, + BiConsumer messageHandler, BiConsumer> errorHandler) { - this(new InboundDecoder(version, recycler), new InboundAggregator(), messageHandler, errorHandler); + this(statsTracker, relativeTimeInMillis, new InboundDecoder(version, recycler), new InboundAggregator(), messageHandler, + errorHandler); } - private InboundPipeline(InboundDecoder decoder, InboundAggregator aggregator, - BiConsumer messageHandler, + private InboundPipeline(StatsTracker statsTracker, LongSupplier relativeTimeInMillis, InboundDecoder decoder, + InboundAggregator aggregator, BiConsumer messageHandler, BiConsumer> errorHandler) { + this.relativeTimeInMillis = relativeTimeInMillis; + this.statsTracker = statsTracker; this.decoder = decoder; this.aggregator = aggregator; this.messageHandler = messageHandler; @@ -67,6 +74,8 @@ public void close() { } public void handleBytes(TcpChannel channel, ReleasableBytesReference reference) throws IOException { + channel.getChannelStats().markAccessed(relativeTimeInMillis.getAsLong()); + statsTracker.markBytesRead(reference.length()); pending.add(reference.retain()); final ArrayList fragments = fragmentList.get(); @@ -116,12 +125,14 @@ private void forwardFragments(TcpChannel channel, ArrayList fragments) t } else if (fragment == InboundDecoder.END_CONTENT) { assert aggregator.isAggregating(); try (InboundMessage aggregated = aggregator.finishAggregation()) { + statsTracker.markMessageReceived(); messageHandler.accept(channel, aggregated); } } else if (fragment instanceof Exception) { final Header header; if (aggregator.isAggregating()) { header = aggregator.cancelAggregation(); + statsTracker.markMessageReceived(); } else { header = null; } diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java index 634600bfca07b..b9b1bbb7a4e89 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java @@ -31,7 +31,6 @@ import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.transport.TransportAddress; @@ -46,19 +45,20 @@ final class OutboundHandler { private static final Logger logger = LogManager.getLogger(OutboundHandler.class); - private final MeanMetric transmittedBytesMetric = new MeanMetric(); - private final String nodeName; private final Version version; private final String[] features; + private final StatsTracker statsTracker; private final ThreadPool threadPool; private final BigArrays bigArrays; private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER; - OutboundHandler(String nodeName, Version version, String[] features, ThreadPool threadPool, BigArrays bigArrays) { + OutboundHandler(String nodeName, Version version, String[] features, StatsTracker statsTracker, ThreadPool threadPool, + BigArrays bigArrays) { this.nodeName = nodeName; this.version = version; this.features = features; + this.statsTracker = statsTracker; this.threadPool = threadPool; this.bigArrays = bigArrays; } @@ -137,10 +137,6 @@ private void internalSend(TcpChannel channel, SendContext sendContext) throws IO } - MeanMetric getTransmittedBytes() { - return transmittedBytesMetric; - } - void setMessageListener(TransportMessageListener listener) { if (messageListener == TransportMessageListener.NOOP_LISTENER) { messageListener = listener; @@ -209,7 +205,7 @@ public BytesReference get() throws IOException { @Override protected void innerOnResponse(Void v) { assert messageSize != -1 : "If onResponse is being called, the message should have been serialized"; - transmittedBytesMetric.inc(messageSize); + statsTracker.markBytesWritten(messageSize); closeAndCallback(() -> listener.onResponse(v)); } diff --git a/server/src/main/java/org/elasticsearch/transport/StatsTracker.java b/server/src/main/java/org/elasticsearch/transport/StatsTracker.java new file mode 100644 index 0000000000000..24f07e7209e70 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/StatsTracker.java @@ -0,0 +1,64 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.common.metrics.MeanMetric; + +import java.util.concurrent.atomic.LongAdder; + +public class StatsTracker { + + private final LongAdder bytesRead = new LongAdder(); + private final LongAdder messagesReceived = new LongAdder(); + private final MeanMetric writeBytesMetric = new MeanMetric(); + + public void markBytesRead(long bytesReceived) { + bytesRead.add(bytesReceived); + } + + public void markMessageReceived() { + messagesReceived.increment(); + } + + public void markBytesWritten(long bytesWritten) { + writeBytesMetric.inc(bytesWritten); + } + + public long getBytesRead() { + return bytesRead.sum(); + } + + public long getMessagesReceived() { + return messagesReceived.sum(); + } + + + public MeanMetric getWriteBytes() { + return writeBytesMetric; + } + + public long getBytesWritten() { + return writeBytesMetric.sum(); + } + + public long getMessagesSent() { + return writeBytesMetric.count(); + } +} diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 957db5370e514..76a6e3aa1fa2c 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -104,6 +104,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private static final int BYTES_NEEDED_FOR_MESSAGE_SIZE = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE; private static final long THIRTY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.3); + final StatsTracker statsTracker = new StatsTracker(); + // this limit is per-address private static final int LIMIT_LOCAL_PORTS_COUNT = 6; @@ -153,7 +155,7 @@ public TcpTransport(Settings settings, Version version, ThreadPool threadPool, P } BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.IN_FLIGHT_REQUESTS); - this.outboundHandler = new OutboundHandler(nodeName, version, features, threadPool, bigArrays); + this.outboundHandler = new OutboundHandler(nodeName, version, features, statsTracker, threadPool, bigArrays); this.handshaker = new TransportHandshaker(version, threadPool, (node, channel, requestId, v) -> outboundHandler.sendRequest(node, channel, requestId, TransportHandshaker.HANDSHAKE_ACTION_NAME, new TransportHandshaker.HandshakeRequest(version), @@ -169,6 +171,14 @@ public Version getVersion() { return version; } + public StatsTracker getStatsTracker() { + return statsTracker; + } + + public ThreadPool getThreadPool() { + return threadPool; + } + @Override protected void doStart() { } @@ -685,9 +695,6 @@ public void inboundMessage(TcpChannel channel, InboundMessage message) { } public void inboundDecodeException(TcpChannel channel, Tuple tuple) { - // Need to call inbound handler to mark bytes received. This should eventually be unnecessary as the - // stats marking will move into the pipeline. - inboundHandler.handleDecodeException(channel, tuple.v1()); onException(channel, tuple.v2()); } @@ -836,10 +843,12 @@ private void ensureOpen() { @Override public final TransportStats getStats() { - MeanMetric transmittedBytes = outboundHandler.getTransmittedBytes(); - MeanMetric readBytes = inboundHandler.getReadBytes(); - return new TransportStats(acceptedChannels.size(), readBytes.count(), readBytes.sum(), transmittedBytes.count(), - transmittedBytes.sum()); + final MeanMetric writeBytesMetric = statsTracker.getWriteBytes(); + final long bytesWritten = statsTracker.getBytesWritten(); + final long messagesSent = statsTracker.getMessagesSent(); + final long messagesReceived = statsTracker.getMessagesReceived(); + final long bytesRead = statsTracker.getBytesRead(); + return new TransportStats(acceptedChannels.size(), messagesReceived, bytesRead, messagesSent, bytesWritten); } /** diff --git a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java index fdf95829a23e7..a6d6a2fa9cf9b 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java @@ -60,7 +60,8 @@ public void setUp() throws Exception { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); TransportHandshaker handshaker = new TransportHandshaker(version, threadPool, (n, c, r, v) -> {}, (v, f, c, r, r_id) -> {}); TransportKeepAlive keepAlive = new TransportKeepAlive(threadPool, TcpChannel::sendMessage); - OutboundHandler outboundHandler = new OutboundHandler("node", version, new String[0], threadPool, BigArrays.NON_RECYCLING_INSTANCE); + OutboundHandler outboundHandler = new OutboundHandler("node", version, new String[0], new StatsTracker(), threadPool, + BigArrays.NON_RECYCLING_INSTANCE); final NoneCircuitBreakerService breaker = new NoneCircuitBreakerService(); handler = new InboundHandler(threadPool, outboundHandler, namedWriteableRegistry, breaker, handshaker, keepAlive); } @@ -78,8 +79,6 @@ public void testPing() throws Exception { handler.registerRequestHandler(registry); handler.inboundMessage(channel, new InboundMessage(null, true)); - assertEquals(1, handler.getReadBytes().count()); - assertEquals(6, handler.getReadBytes().sum()); if (channel.isServerChannel()) { BytesReference ping = channel.getMessageCaptor().get(); assertEquals('E', ping.get(0)); diff --git a/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java b/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java index 546043c68aa81..c67ff3251e979 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; @@ -38,9 +39,9 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; +import java.util.function.LongSupplier; import static org.hamcrest.Matchers.instanceOf; -import static org.mockito.Mockito.mock; public class InboundPipelineTests extends ESTestCase { @@ -83,10 +84,15 @@ public void testPipelineHandling() throws IOException { }; final PageCacheRecycler recycler = PageCacheRecycler.NON_RECYCLING_INSTANCE; - final InboundPipeline pipeline = new InboundPipeline(Version.CURRENT, recycler, messageHandler, errorHandler); + final StatsTracker statsTracker = new StatsTracker(); + final LongSupplier millisSupplier = () -> TimeValue.nsecToMSec(System.nanoTime()); + final InboundPipeline pipeline = new InboundPipeline(Version.CURRENT, statsTracker, recycler, millisSupplier, messageHandler, + errorHandler); + final FakeTcpChannel channel = new FakeTcpChannel(); final int iterations = randomIntBetween(100, 500); long totalMessages = 0; + long bytesReceived = 0; for (int i = 0; i < iterations; ++i) { actual.clear(); @@ -145,7 +151,8 @@ public void testPipelineHandling() throws IOException { final BytesReference slice = networkBytes.slice(currentOffset, bytesToRead); try (ReleasableBytesReference reference = new ReleasableBytesReference(slice, () -> {})) { toRelease.add(reference); - pipeline.handleBytes(mock(TcpChannel.class), reference); + bytesReceived += reference.length(); + pipeline.handleBytes(channel, reference); currentOffset += bytesToRead; } } @@ -171,6 +178,9 @@ public void testPipelineHandling() throws IOException { assertEquals(0, released.refCount()); } } + + assertEquals(bytesReceived, statsTracker.getBytesRead()); + assertEquals(totalMessages, statsTracker.getMessagesReceived()); } } @@ -178,7 +188,10 @@ public void testEnsureBodyIsNotPrematurelyReleased() throws IOException { final PageCacheRecycler recycler = PageCacheRecycler.NON_RECYCLING_INSTANCE; BiConsumer messageHandler = (c, m) -> {}; BiConsumer> errorHandler = (c, e) -> {}; - final InboundPipeline pipeline = new InboundPipeline(Version.CURRENT, recycler, messageHandler, errorHandler); + final StatsTracker statsTracker = new StatsTracker(); + final LongSupplier millisSupplier = () -> TimeValue.nsecToMSec(System.nanoTime()); + final InboundPipeline pipeline = new InboundPipeline(Version.CURRENT, statsTracker, recycler, millisSupplier, messageHandler, + errorHandler); try (BytesStreamOutput streamOutput = new BytesStreamOutput()) { String actionName = "actionName"; diff --git a/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java index 2181a62da73a8..ea5c639e8a701 100644 --- a/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -46,6 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongSupplier; import static org.hamcrest.Matchers.instanceOf; @@ -57,6 +59,7 @@ public class OutboundHandlerTests extends ESTestCase { private final TransportRequestOptions options = TransportRequestOptions.EMPTY; private final AtomicReference> message = new AtomicReference<>(); private InboundPipeline pipeline; + private StatsTracker statsTracker; private OutboundHandler handler; private FakeTcpChannel channel; private DiscoveryNode node; @@ -68,15 +71,19 @@ public void setUp() throws Exception { TransportAddress transportAddress = buildNewFakeTransportAddress(); node = new DiscoveryNode("", transportAddress, Version.CURRENT); String[] features = {feature1, feature2}; - handler = new OutboundHandler("node", Version.CURRENT, features, threadPool, BigArrays.NON_RECYCLING_INSTANCE); - pipeline = new InboundPipeline(Version.CURRENT, PageCacheRecycler.NON_RECYCLING_INSTANCE, (c, m) -> { - try (BytesStreamOutput streamOutput = new BytesStreamOutput()) { - Streams.copy(m.openOrGetStreamInput(), streamOutput); - message.set(new Tuple<>(m.getHeader(), streamOutput.bytes())); - } catch (IOException e) { - throw new AssertionError(e); - } - }, (c, t) -> { + statsTracker = new StatsTracker(); + handler = new OutboundHandler("node", Version.CURRENT, features, statsTracker, threadPool, BigArrays.NON_RECYCLING_INSTANCE); + + final LongSupplier millisSupplier = () -> TimeValue.nsecToMSec(System.nanoTime()); + pipeline = new InboundPipeline(Version.CURRENT, new StatsTracker(), PageCacheRecycler.NON_RECYCLING_INSTANCE, millisSupplier, + (c, m) -> { + try (BytesStreamOutput streamOutput = new BytesStreamOutput()) { + Streams.copy(m.openOrGetStreamInput(), streamOutput); + message.set(new Tuple<>(m.getHeader(), streamOutput.bytes())); + } catch (IOException e) { + throw new AssertionError(e); + } + }, (c, t) -> { throw new AssertionError(t.v2()); }); } @@ -150,7 +157,8 @@ public void onRequestSent(DiscoveryNode node, long requestId, String action, Tra assertEquals(action, actionRef.get()); assertEquals(request, requestRef.get()); - pipeline.handleBytes(channel, new ReleasableBytesReference(reference, () -> {})); + pipeline.handleBytes(channel, new ReleasableBytesReference(reference, () -> { + })); final Tuple tuple = message.get(); final Header header = tuple.v1(); final TestRequest message = new TestRequest(tuple.v2().streamInput()); @@ -208,7 +216,8 @@ public void onResponseSent(long requestId, String action, TransportResponse resp assertEquals(action, actionRef.get()); assertEquals(response, responseRef.get()); - pipeline.handleBytes(channel, new ReleasableBytesReference(reference, () -> {})); + pipeline.handleBytes(channel, new ReleasableBytesReference(reference, () -> { + })); final Tuple tuple = message.get(); final Header header = tuple.v1(); final TestResponse message = new TestResponse(tuple.v2().streamInput()); @@ -266,7 +275,8 @@ public void onResponseSent(long requestId, String action, Exception error) { assertEquals(error, responseRef.get()); - pipeline.handleBytes(channel, new ReleasableBytesReference(reference, () -> {})); + pipeline.handleBytes(channel, new ReleasableBytesReference(reference, () -> { + })); final Tuple tuple = message.get(); final Header header = tuple.v1(); assertEquals(version, header.getVersion()); diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index aa20af7cadfdb..14217373e0e8e 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -410,7 +410,7 @@ private void testExceptionHandling(boolean startTransport, Exception exception, channel.addCloseListener(listener); TcpTransport.handleException(channel, exception, lifecycle, - new OutboundHandler(randomAlphaOfLength(10), Version.CURRENT, new String[0], testThreadPool, + new OutboundHandler(randomAlphaOfLength(10), Version.CURRENT, new String[0], new StatsTracker(), testThreadPool, BigArrays.NON_RECYCLING_INSTANCE)); if (expectClosed) { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index 9894872b762f2..e68b265a67211 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -273,8 +273,9 @@ private static class MockTcpReadWriteHandler extends BytesWriteHandler { private MockTcpReadWriteHandler(MockSocketChannel channel, PageCacheRecycler recycler, TcpTransport transport) { this.channel = channel; - this.pipeline = new InboundPipeline(transport.getVersion(), recycler, transport::inboundMessage, - transport::inboundDecodeException); + final ThreadPool threadPool = transport.getThreadPool(); + this.pipeline = new InboundPipeline(transport.getVersion(), transport.getStatsTracker(), recycler, + threadPool::relativeTimeInMillis, transport::inboundMessage, transport::inboundDecodeException); } @Override