Skip to content

Commit

Permalink
Move transport decoding and aggregation to server (#54360)
Browse files Browse the repository at this point in the history
Currently all of our transport protocol decoding and aggregation occurs
in the individual transport modules. This means that each implementation
(test, netty, nio) must implement this logic. Additionally, it means
that the entire message has been read from the network before the server
package receives it.

This commit creates a pipeline in server which can be passed arbitrary
bytes to handle. Internally, the pipeline will decode, decompress, and
aggregate the messages. Additionally, this allows us to run many
megabytes of bytes through the pipeline in tests to ensure that the
logic works.

This work will enable future work:

Circuit breaking or backoff logic based on message type and byte
in the content aggregator.
Sharing bytes with the application layer using the ref counted
releasable network bytes.
Improved network monitoring based specifically on channels.
Finally, this fixes the bug where we do not circuit break on the correct
message size when compression is enabled.
  • Loading branch information
Tim-Brooks authored Mar 27, 2020
1 parent 1630de4 commit 2ccddbf
Show file tree
Hide file tree
Showing 34 changed files with 2,157 additions and 792 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.Attribute;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.transport.InboundPipeline;
import org.elasticsearch.transport.Transports;

import java.nio.channels.ClosedChannelException;
Expand All @@ -45,23 +49,23 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
private final Queue<WriteOperation> queuedWrites = new ArrayDeque<>();

private WriteOperation currentWrite;
private final InboundPipeline pipeline;

Netty4MessageChannelHandler(Netty4Transport transport) {
Netty4MessageChannelHandler(PageCacheRecycler recycler, Netty4Transport transport) {
this.transport = transport;
this.pipeline = new InboundPipeline(transport.getVersion(), recycler, transport::inboundMessage, transport::inboundDecodeException);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
assert Transports.assertTransportThread();
assert msg instanceof ByteBuf : "Expected message type ByteBuf, found: " + msg.getClass();

final ByteBuf buffer = (ByteBuf) msg;
try {
Channel channel = ctx.channel();
Attribute<Netty4TcpChannel> channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY);
transport.inboundMessage(channelAttribute.get(), Netty4Utils.toBytesReference(buffer));
} finally {
buffer.release();
Netty4TcpChannel channel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get();
final BytesReference wrapped = Netty4Utils.toBytesReference(buffer);
try (ReleasableBytesReference reference = new ReleasableBytesReference(wrapped, buffer::release)) {
pipeline.handleBytes(channel, reference);
}
}

Expand Down Expand Up @@ -104,6 +108,7 @@ public void flush(ChannelHandlerContext ctx) {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
doFlush(ctx);
Releasables.closeWhileHandlingException(pipeline);
super.channelInactive(ctx);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,8 @@ protected class ClientChannelInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast("logging", new ESLoggingHandler());
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
// using a dot as a prefix means this cannot come from any settings parsed
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this));
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(pageCacheRecycler, Netty4Transport.this));
}

@Override
Expand All @@ -349,8 +348,7 @@ protected void initChannel(Channel ch) throws Exception {
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, ch.newSucceededFuture());
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
ch.pipeline().addLast("logging", new ESLoggingHandler());
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this));
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(pageCacheRecycler, Netty4Transport.this));
serverAcceptedChannel(nettyTcpChannel);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ protected Function<DiscoveryNode, TcpChannelFactory> clientChannelFactoryFunctio
return (n) -> new TcpChannelFactoryImpl(profileSettings, true);
}

protected abstract class TcpChannelFactory extends ChannelFactory<NioTcpServerChannel, NioTcpChannel> {
protected abstract static class TcpChannelFactory extends ChannelFactory<NioTcpServerChannel, NioTcpChannel> {

protected TcpChannelFactory(ProfileSettings profileSettings) {
super(profileSettings.tcpNoDelay, profileSettings.tcpKeepAlive, profileSettings.tcpKeepIdle, profileSettings.tcpKeepInterval,
Expand All @@ -162,8 +162,8 @@ private TcpChannelFactoryImpl(ProfileSettings profileSettings, boolean isClient)
@Override
public NioTcpChannel createChannel(NioSelector selector, SocketChannel channel, Config.Socket socketConfig) {
NioTcpChannel nioChannel = new NioTcpChannel(isClient == false, profileName, channel);
TcpReadWriteHandler handler = new TcpReadWriteHandler(nioChannel, NioTransport.this);
Consumer<Exception> exceptionHandler = (e) -> onException(nioChannel, e);
TcpReadWriteHandler handler = new TcpReadWriteHandler(nioChannel, pageCacheRecycler, NioTransport.this);
BytesChannelContext context = new BytesChannelContext(nioChannel, selector, socketConfig, exceptionHandler, handler,
new InboundChannelBuffer(pageAllocator));
nioChannel.setContext(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,47 @@
package org.elasticsearch.transport.nio;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.nio.BytesWriteHandler;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.Page;
import org.elasticsearch.transport.InboundPipeline;
import org.elasticsearch.transport.TcpTransport;

import java.io.IOException;

public class TcpReadWriteHandler extends BytesWriteHandler {

private final NioTcpChannel channel;
private final TcpTransport transport;
private final InboundPipeline pipeline;

public TcpReadWriteHandler(NioTcpChannel channel, TcpTransport transport) {
public TcpReadWriteHandler(NioTcpChannel channel, PageCacheRecycler recycler, TcpTransport transport) {
this.channel = channel;
this.transport = transport;
this.pipeline = new InboundPipeline(transport.getVersion(), recycler, transport::inboundMessage, transport::inboundDecodeException);
}

@Override
public int consumeReads(InboundChannelBuffer channelBuffer) throws IOException {
BytesReference bytesReference = BytesReference.fromByteBuffers(channelBuffer.sliceBuffersTo(channelBuffer.getIndex()));
return transport.consumeNetworkReads(channel, bytesReference);
Page[] pages = channelBuffer.sliceAndRetainPagesTo(channelBuffer.getIndex());
BytesReference[] references = new BytesReference[pages.length];
for (int i = 0; i < pages.length; ++i) {
references[i] = BytesReference.fromByteBuffer(pages[i].byteBuffer());
}
Releasable releasable = () -> IOUtils.closeWhileHandlingException(pages);
try (ReleasableBytesReference reference = new ReleasableBytesReference(new CompositeBytesReference(references), releasable)) {
pipeline.handleBytes(channel, reference);
return reference.length();
}
}

@Override
public void close() {
Releasables.closeWhileHandlingException(pipeline);
super.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ static BytesReference fromByteBuffers(ByteBuffer[] buffers) {
if (bufferCount == 0) {
return BytesArray.EMPTY;
} else if (bufferCount == 1) {
return new ByteBufferReference(buffers[0]);
return fromByteBuffer(buffers[0]);
} else {
ByteBufferReference[] references = new ByteBufferReference[bufferCount];
for (int i = 0; i < bufferCount; ++i) {
Expand All @@ -102,6 +102,13 @@ static BytesReference fromByteBuffers(ByteBuffer[] buffers) {
}
}

/**
* Returns BytesReference composed of the provided ByteBuffer.
*/
static BytesReference fromByteBuffer(ByteBuffer buffer) {
return new ByteBufferReference(buffer);
}

/**
* Returns the byte at the specified index. Need to be between 0 and length.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
Expand All @@ -33,21 +34,43 @@
* An extension to {@link BytesReference} that requires releasing its content. This
* class exists to make it explicit when a bytes reference needs to be released, and when not.
*/
public final class ReleasableBytesReference implements Releasable, BytesReference {
public final class ReleasableBytesReference extends AbstractRefCounted implements Releasable, BytesReference {

public static final Releasable NO_OP = () -> {};
private final BytesReference delegate;
private final Releasable releasable;

public ReleasableBytesReference(BytesReference delegate, Releasable releasable) {
super("bytes-reference");
this.delegate = delegate;
this.releasable = releasable;
}

public static ReleasableBytesReference wrap(BytesReference reference) {
return new ReleasableBytesReference(reference, NO_OP);
}

@Override
public void close() {
protected void closeInternal() {
Releasables.close(releasable);
}

public ReleasableBytesReference retain() {
incRef();
return this;
}

public ReleasableBytesReference retainedSlice(int from, int length) {
BytesReference slice = delegate.slice(from, length);
incRef();
return new ReleasableBytesReference(slice, this);
}

@Override
public void close() {
decRef();
}

@Override
public byte get(int index) {
return delegate.get(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public interface Compressor {

boolean isCompressed(BytesReference bytes);

int headerLength();

StreamInput streamInput(StreamInput in) throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public boolean isCompressed(BytesReference bytes) {
return true;
}

@Override
public int headerLength() {
return HEADER.length;
}

@Override
public StreamInput streamInput(StreamInput in) throws IOException {
final byte[] headerBytes = new byte[HEADER.length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public static void close(Releasable... releasables) {
}

/** Release the provided {@link Releasable}s, ignoring exceptions. */
public static void closeWhileHandlingException(Iterable<Releasable> releasables) {
public static void closeWhileHandlingException(Iterable<? extends Releasable> releasables) {
close(releasables, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,12 @@ public void writeTo(StreamOutput out) throws IOException {
* Reads the headers from the stream into the current context
*/
public void readHeaders(StreamInput in) throws IOException {
final Tuple<Map<String, String>, Map<String, Set<String>>> streamTuple = readHeadersFromStream(in);
final Map<String, String> requestHeaders = streamTuple.v1();
final Map<String, Set<String>> responseHeaders = streamTuple.v2();
setHeaders(readHeadersFromStream(in));
}

public void setHeaders(Tuple<Map<String, String>, Map<String, Set<String>>> headerTuple) {
final Map<String, String> requestHeaders = headerTuple.v1();
final Map<String, Set<String>> responseHeaders = headerTuple.v2();
final ThreadContextStruct struct;
if (requestHeaders.isEmpty() && responseHeaders.isEmpty()) {
struct = ThreadContextStruct.EMPTY;
Expand Down
Loading

0 comments on commit 2ccddbf

Please sign in to comment.