Skip to content

Commit

Permalink
Move network stats marking into InboundPipeline (#54393)
Browse files Browse the repository at this point in the history
This is a follow-up to #48263. It moves the inbound stats tracking
inside of the InboundPipeline.
  • Loading branch information
Tim-Brooks authored Mar 31, 2020
1 parent b90e491 commit 9d861bf
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.InboundPipeline;
import org.elasticsearch.transport.Transports;

Expand All @@ -53,7 +54,9 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {

Netty4MessageChannelHandler(PageCacheRecycler recycler, Netty4Transport transport) {
this.transport = transport;
this.pipeline = new InboundPipeline(transport.getVersion(), recycler, transport::inboundMessage, transport::inboundDecodeException);
final ThreadPool threadPool = transport.getThreadPool();
this.pipeline = new InboundPipeline(transport.getVersion(), transport.getStatsTracker(), recycler, threadPool::relativeTimeInMillis,
transport::inboundMessage, transport::inboundDecodeException);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.nio.BytesWriteHandler;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.Page;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.InboundPipeline;
import org.elasticsearch.transport.TcpTransport;

Expand All @@ -41,7 +42,9 @@ public class TcpReadWriteHandler extends BytesWriteHandler {

public TcpReadWriteHandler(NioTcpChannel channel, PageCacheRecycler recycler, TcpTransport transport) {
this.channel = channel;
this.pipeline = new InboundPipeline(transport.getVersion(), recycler, transport::inboundMessage, transport::inboundDecodeException);
final ThreadPool threadPool = transport.getThreadPool();
this.pipeline = new InboundPipeline(transport.getVersion(), transport.getStatsTracker(), recycler, threadPool::relativeTimeInMillis,
transport::inboundMessage, transport::inboundDecodeException);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand All @@ -44,7 +43,6 @@ public class InboundHandler {

private static final Logger logger = LogManager.getLogger(InboundHandler.class);

private final MeanMetric readBytesMetric = new MeanMetric();
private final ThreadPool threadPool;
private final OutboundHandler outboundHandler;
private final NamedWriteableRegistry namedWriteableRegistry;
Expand Down Expand Up @@ -82,10 +80,6 @@ final Transport.ResponseHandlers getResponseHandlers() {
return responseHandlers;
}

MeanMetric getReadBytes() {
return readBytesMetric;
}

void setMessageListener(TransportMessageListener listener) {
if (messageListener == TransportMessageListener.NOOP_LISTENER) {
messageListener = listener;
Expand All @@ -99,19 +93,12 @@ void inboundMessage(TcpChannel channel, InboundMessage message) throws Exception
TransportLogger.logInboundMessage(channel, message);

if (message.isPing()) {
readBytesMetric.inc(TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE);
keepAlive.receiveKeepAlive(channel);
} else {
readBytesMetric.inc(message.getHeader().getNetworkMessageSize() + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE);
messageReceived(message, channel);
}
}

void handleDecodeException(TcpChannel channel, Header header) {
channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis());
readBytesMetric.inc(header.getNetworkMessageSize() + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE);
}

private void messageReceived(InboundMessage message, TcpChannel channel) throws IOException {
final InetSocketAddress remoteAddress = channel.getRemoteAddress();
final Header header = message.getHeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,34 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;

public class InboundPipeline implements Releasable {

private static final ThreadLocal<ArrayList<Object>> fragmentList = ThreadLocal.withInitial(ArrayList::new);
private static final InboundMessage PING_MESSAGE = new InboundMessage(null, true);

private final LongSupplier relativeTimeInMillis;
private final StatsTracker statsTracker;
private final InboundDecoder decoder;
private final InboundAggregator aggregator;
private final BiConsumer<TcpChannel, InboundMessage> messageHandler;
private final BiConsumer<TcpChannel, Tuple<Header, Exception>> errorHandler;
private ArrayDeque<ReleasableBytesReference> pending = new ArrayDeque<>(2);
private boolean isClosed = false;

public InboundPipeline(Version version, PageCacheRecycler recycler, BiConsumer<TcpChannel, InboundMessage> messageHandler,
public InboundPipeline(Version version, StatsTracker statsTracker, PageCacheRecycler recycler, LongSupplier relativeTimeInMillis,
BiConsumer<TcpChannel, InboundMessage> messageHandler,
BiConsumer<TcpChannel, Tuple<Header, Exception>> errorHandler) {
this(new InboundDecoder(version, recycler), new InboundAggregator(), messageHandler, errorHandler);
this(statsTracker, relativeTimeInMillis, new InboundDecoder(version, recycler), new InboundAggregator(), messageHandler,
errorHandler);
}

private InboundPipeline(InboundDecoder decoder, InboundAggregator aggregator,
BiConsumer<TcpChannel, InboundMessage> messageHandler,
private InboundPipeline(StatsTracker statsTracker, LongSupplier relativeTimeInMillis, InboundDecoder decoder,
InboundAggregator aggregator, BiConsumer<TcpChannel, InboundMessage> messageHandler,
BiConsumer<TcpChannel, Tuple<Header, Exception>> errorHandler) {
this.relativeTimeInMillis = relativeTimeInMillis;
this.statsTracker = statsTracker;
this.decoder = decoder;
this.aggregator = aggregator;
this.messageHandler = messageHandler;
Expand All @@ -67,6 +74,8 @@ public void close() {
}

public void handleBytes(TcpChannel channel, ReleasableBytesReference reference) throws IOException {
channel.getChannelStats().markAccessed(relativeTimeInMillis.getAsLong());
statsTracker.markBytesRead(reference.length());
pending.add(reference.retain());

final ArrayList<Object> fragments = fragmentList.get();
Expand Down Expand Up @@ -116,12 +125,14 @@ private void forwardFragments(TcpChannel channel, ArrayList<Object> fragments) t
} else if (fragment == InboundDecoder.END_CONTENT) {
assert aggregator.isAggregating();
try (InboundMessage aggregated = aggregator.finishAggregation()) {
statsTracker.markMessageReceived();
messageHandler.accept(channel, aggregated);
}
} else if (fragment instanceof Exception) {
final Header header;
if (aggregator.isAggregating()) {
header = aggregator.cancelAggregation();
statsTracker.markMessageReceived();
} else {
header = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.transport.TransportAddress;
Expand All @@ -45,17 +44,17 @@ final class OutboundHandler {

private static final Logger logger = LogManager.getLogger(OutboundHandler.class);

private final MeanMetric transmittedBytesMetric = new MeanMetric();

private final String nodeName;
private final Version version;
private final StatsTracker statsTracker;
private final ThreadPool threadPool;
private final BigArrays bigArrays;
private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;

OutboundHandler(String nodeName, Version version, ThreadPool threadPool, BigArrays bigArrays) {
OutboundHandler(String nodeName, Version version, StatsTracker statsTracker, ThreadPool threadPool, BigArrays bigArrays) {
this.nodeName = nodeName;
this.version = version;
this.statsTracker = statsTracker;
this.threadPool = threadPool;
this.bigArrays = bigArrays;
}
Expand Down Expand Up @@ -133,10 +132,6 @@ private void internalSend(TcpChannel channel, SendContext sendContext) throws IO

}

MeanMetric getTransmittedBytes() {
return transmittedBytesMetric;
}

void setMessageListener(TransportMessageListener listener) {
if (messageListener == TransportMessageListener.NOOP_LISTENER) {
messageListener = listener;
Expand Down Expand Up @@ -205,7 +200,7 @@ public BytesReference get() throws IOException {
@Override
protected void innerOnResponse(Void v) {
assert messageSize != -1 : "If onResponse is being called, the message should have been serialized";
transmittedBytesMetric.inc(messageSize);
statsTracker.markBytesWritten(messageSize);
closeAndCallback(() -> listener.onResponse(v));
}

Expand Down
64 changes: 64 additions & 0 deletions server/src/main/java/org/elasticsearch/transport/StatsTracker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.transport;

import org.elasticsearch.common.metrics.MeanMetric;

import java.util.concurrent.atomic.LongAdder;

public class StatsTracker {

private final LongAdder bytesRead = new LongAdder();
private final LongAdder messagesReceived = new LongAdder();
private final MeanMetric writeBytesMetric = new MeanMetric();

public void markBytesRead(long bytesReceived) {
bytesRead.add(bytesReceived);
}

public void markMessageReceived() {
messagesReceived.increment();
}

public void markBytesWritten(long bytesWritten) {
writeBytesMetric.inc(bytesWritten);
}

public long getBytesRead() {
return bytesRead.sum();
}

public long getMessagesReceived() {
return messagesReceived.sum();
}


public MeanMetric getWriteBytes() {
return writeBytesMetric;
}

public long getBytesWritten() {
return writeBytesMetric.sum();
}

public long getMessagesSent() {
return writeBytesMetric.count();
}
}
25 changes: 17 additions & 8 deletions server/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
private static final int BYTES_NEEDED_FOR_MESSAGE_SIZE = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE;
private static final long THIRTY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.3);

final StatsTracker statsTracker = new StatsTracker();

// this limit is per-address
private static final int LIMIT_LOCAL_PORTS_COUNT = 6;

Expand Down Expand Up @@ -139,7 +141,7 @@ public TcpTransport(Settings settings, Version version, ThreadPool threadPool, P
String nodeName = Node.NODE_NAME_SETTING.get(settings);
BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.IN_FLIGHT_REQUESTS);

this.outboundHandler = new OutboundHandler(nodeName, version, threadPool, bigArrays);
this.outboundHandler = new OutboundHandler(nodeName, version, statsTracker, threadPool, bigArrays);
this.handshaker = new TransportHandshaker(ClusterName.CLUSTER_NAME_SETTING.get(settings), version, threadPool,
(node, channel, requestId, v) -> outboundHandler.sendRequest(node, channel, requestId,
TransportHandshaker.HANDSHAKE_ACTION_NAME, new TransportHandshaker.HandshakeRequest(version),
Expand All @@ -155,6 +157,14 @@ public Version getVersion() {
return version;
}

public StatsTracker getStatsTracker() {
return statsTracker;
}

public ThreadPool getThreadPool() {
return threadPool;
}

@Override
protected void doStart() {
}
Expand Down Expand Up @@ -676,9 +686,6 @@ public void inboundMessage(TcpChannel channel, InboundMessage message) {
}

public void inboundDecodeException(TcpChannel channel, Tuple<Header, Exception> tuple) {
// Need to call inbound handler to mark bytes received. This should eventually be unnecessary as the
// stats marking will move into the pipeline.
inboundHandler.handleDecodeException(channel, tuple.v1());
onException(channel, tuple.v2());
}

Expand Down Expand Up @@ -827,10 +834,12 @@ private void ensureOpen() {

@Override
public final TransportStats getStats() {
MeanMetric transmittedBytes = outboundHandler.getTransmittedBytes();
MeanMetric readBytes = inboundHandler.getReadBytes();
return new TransportStats(acceptedChannels.size(), readBytes.count(), readBytes.sum(), transmittedBytes.count(),
transmittedBytes.sum());
final MeanMetric writeBytesMetric = statsTracker.getWriteBytes();
final long bytesWritten = statsTracker.getBytesWritten();
final long messagesSent = statsTracker.getMessagesSent();
final long messagesReceived = statsTracker.getMessagesReceived();
final long bytesRead = statsTracker.getBytesRead();
return new TransportStats(acceptedChannels.size(), messagesReceived, bytesRead, messagesSent, bytesWritten);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public void setUp() throws Exception {
TransportHandshaker handshaker = new TransportHandshaker(new ClusterName("cluster-name"), version, threadPool, (n, c, r, v) -> {
}, (v, c, r, r_id) -> { });
TransportKeepAlive keepAlive = new TransportKeepAlive(threadPool, TcpChannel::sendMessage);
OutboundHandler outboundHandler = new OutboundHandler("node", version, threadPool, BigArrays.NON_RECYCLING_INSTANCE);
OutboundHandler outboundHandler = new OutboundHandler("node", version, new StatsTracker(), threadPool,
BigArrays.NON_RECYCLING_INSTANCE);
final NoneCircuitBreakerService breaker = new NoneCircuitBreakerService();
handler = new InboundHandler(threadPool, outboundHandler, namedWriteableRegistry, breaker, handshaker, keepAlive);
}
Expand All @@ -80,8 +81,6 @@ public void testPing() throws Exception {
handler.registerRequestHandler(registry);

handler.inboundMessage(channel, new InboundMessage(null, true));
assertEquals(1, handler.getReadBytes().count());
assertEquals(6, handler.getReadBytes().sum());
if (channel.isServerChannel()) {
BytesReference ping = channel.getMessageCaptor().get();
assertEquals('E', ping.get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
Expand All @@ -37,9 +38,9 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;

import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Mockito.mock;

public class InboundPipelineTests extends ESTestCase {

Expand Down Expand Up @@ -82,10 +83,15 @@ public void testPipelineHandling() throws IOException {
};

final PageCacheRecycler recycler = PageCacheRecycler.NON_RECYCLING_INSTANCE;
final InboundPipeline pipeline = new InboundPipeline(Version.CURRENT, recycler, messageHandler, errorHandler);
final StatsTracker statsTracker = new StatsTracker();
final LongSupplier millisSupplier = () -> TimeValue.nsecToMSec(System.nanoTime());
final InboundPipeline pipeline = new InboundPipeline(Version.CURRENT, statsTracker, recycler, millisSupplier, messageHandler,
errorHandler);
final FakeTcpChannel channel = new FakeTcpChannel();

final int iterations = randomIntBetween(100, 500);
long totalMessages = 0;
long bytesReceived = 0;

for (int i = 0; i < iterations; ++i) {
actual.clear();
Expand Down Expand Up @@ -144,7 +150,8 @@ public void testPipelineHandling() throws IOException {
final BytesReference slice = networkBytes.slice(currentOffset, bytesToRead);
try (ReleasableBytesReference reference = new ReleasableBytesReference(slice, () -> {})) {
toRelease.add(reference);
pipeline.handleBytes(mock(TcpChannel.class), reference);
bytesReceived += reference.length();
pipeline.handleBytes(channel, reference);
currentOffset += bytesToRead;
}
}
Expand All @@ -170,6 +177,9 @@ public void testPipelineHandling() throws IOException {
assertEquals(0, released.refCount());
}
}

assertEquals(bytesReceived, statsTracker.getBytesRead());
assertEquals(totalMessages, statsTracker.getMessagesReceived());
}
}

Expand Down
Loading

0 comments on commit 9d861bf

Please sign in to comment.