Skip to content

Commit

Permalink
Merge pull request #437 from zhenlineo/1.5-redirect-logging
Browse files Browse the repository at this point in the history
Redirect netty logging to driver logging
  • Loading branch information
lutovich authored Nov 29, 2017
2 parents ae43d77 + b90c31c commit 9765378
Show file tree
Hide file tree
Showing 30 changed files with 518 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.netty.bootstrap.Bootstrap;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.io.IOException;
import java.net.URI;
Expand All @@ -36,6 +37,7 @@
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancer;
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancingStrategy;
import org.neo4j.driver.internal.cluster.loadbalancing.RoundRobinLoadBalancingStrategy;
import org.neo4j.driver.internal.logging.NettyLogging;
import org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.retry.RetrySettings;
Expand Down Expand Up @@ -69,6 +71,7 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
RoutingSettings newRoutingSettings = routingSettings.withRoutingContext( new RoutingContext( uri ) );
SecurityPlan securityPlan = createSecurityPlan( address, config );

InternalLoggerFactory.setDefaultFactory( new NettyLogging( config.logging() ) );
Bootstrap bootstrap = createBootstrap();
EventExecutorGroup eventExecutorGroup = bootstrap.config().group();
RetryLogic retryLogic = createRetryLogic( retrySettings, eventExecutorGroup, config.logging() );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class InternalDriver implements Driver
this.securityPlan = securityPlan;
this.sessionFactory = sessionFactory;
this.log = logging.getLog( Driver.class.getSimpleName() );
log.info( "Driver instance %s created", this );
}

@Override
Expand Down Expand Up @@ -112,7 +113,7 @@ public CompletionStage<Void> closeAsync()
{
if ( closed.compareAndSet( false, true ) )
{
log.info( "Driver instance is closing" );
log.info( "Closing driver instance %s", this );
return sessionFactory.close();
}
return completedFuture( null );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import static io.netty.buffer.Unpooled.copyShort;
import static io.netty.buffer.Unpooled.unreleasableBuffer;

public final class ProtocolUtil
public final class BoltProtocolV1Util
{
public static final int HTTP = 1213486160; //== 0x48545450 == "HTTP"

Expand All @@ -36,24 +36,30 @@ public final class ProtocolUtil

public static final int DEFAULT_MAX_OUTBOUND_CHUNK_SIZE_BYTES = Short.MAX_VALUE / 2;

private static final ByteBuf HANDSHAKE_BUF = unreleasableBuffer( copyInt(
private static final ByteBuf BOLT_V1_HANDSHAKE_BUF = unreleasableBuffer( copyInt(
BOLT_MAGIC_PREAMBLE,
PROTOCOL_VERSION_1,
NO_PROTOCOL_VERSION,
NO_PROTOCOL_VERSION,
NO_PROTOCOL_VERSION ) ).asReadOnly();
NO_PROTOCOL_VERSION ) )
.asReadOnly();

private static final ByteBuf MESSAGE_BOUNDARY_BUF = unreleasableBuffer( copyShort( 0 ) ).asReadOnly();

private static final ByteBuf CHUNK_HEADER_PLACEHOLDER_BUF = unreleasableBuffer( copyShort( 0 ) ).asReadOnly();

private ProtocolUtil()
private BoltProtocolV1Util()
{
}

public static ByteBuf handshake()
public static ByteBuf handshakeBuf()
{
return HANDSHAKE_BUF.duplicate();
return BOLT_V1_HANDSHAKE_BUF.duplicate();
}

public static String handshakeString()
{
return "[0x6060B017, 1, 0, 0, 0]";
}

public static ByteBuf messageBoundary()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,21 @@
import io.netty.channel.ChannelPromise;

import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;

import static java.lang.String.format;
import static org.neo4j.driver.internal.async.ProtocolUtil.handshake;
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.handshakeBuf;
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.handshakeString;

public class ChannelConnectedListener implements ChannelFutureListener
{
private final BoltServerAddress address;
private final ChannelPipelineBuilder pipelineBuilder;
private final ChannelPromise handshakeCompletedPromise;
private final Logging logging;
private final Logger log;

public ChannelConnectedListener( BoltServerAddress address, ChannelPipelineBuilder pipelineBuilder,
ChannelPromise handshakeCompletedPromise, Logging logging )
Expand All @@ -47,21 +48,22 @@ public ChannelConnectedListener( BoltServerAddress address, ChannelPipelineBuild
this.pipelineBuilder = pipelineBuilder;
this.handshakeCompletedPromise = handshakeCompletedPromise;
this.logging = logging;
this.log = logging.getLog( getClass().getSimpleName() );
}

@Override
public void operationComplete( ChannelFuture future )
{
Channel channel = future.channel();
Logger log = new ChannelActivityLogger( channel, logging, getClass() );

if ( future.isSuccess() )
{
log.trace( "Channel %s connected, running bolt handshake", channel );
log.trace( "Channel %s connected, initiating bolt handshake", channel );

ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast( new HandshakeHandler( pipelineBuilder, handshakeCompletedPromise, logging ) );
ChannelFuture handshakeFuture = channel.writeAndFlush( handshake() );
log.debug( "C: [Bolt Handshake] %s", handshakeString() );
ChannelFuture handshakeFuture = channel.writeAndFlush( handshakeBuf() );

handshakeFuture.addListener( channelFuture ->
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan
public ChannelFuture connect( BoltServerAddress address, Bootstrap bootstrap )
{
bootstrap.option( ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis );
bootstrap.handler( new NettyChannelInitializer( address, securityPlan, clock ) );
bootstrap.handler( new NettyChannelInitializer( address, securityPlan, clock, logging ) );

ChannelFuture channelConnected = bootstrap.connect( address.toSocketAddress() );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.List;
import javax.net.ssl.SSLHandshakeException;

import org.neo4j.driver.internal.logging.PrefixedLogger;
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
import org.neo4j.driver.internal.messaging.MessageFormat;
import org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1;
import org.neo4j.driver.internal.util.ErrorUtil;
Expand All @@ -38,9 +38,9 @@
import org.neo4j.driver.v1.exceptions.SecurityException;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;

import static org.neo4j.driver.internal.async.ProtocolUtil.HTTP;
import static org.neo4j.driver.internal.async.ProtocolUtil.NO_PROTOCOL_VERSION;
import static org.neo4j.driver.internal.async.ProtocolUtil.PROTOCOL_VERSION_1;
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.HTTP;
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.NO_PROTOCOL_VERSION;
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.PROTOCOL_VERSION_1;

public class HandshakeHandler extends ReplayingDecoder<Void>
{
Expand All @@ -62,7 +62,7 @@ public HandshakeHandler( ChannelPipelineBuilder pipelineBuilder, ChannelPromise
@Override
public void handlerAdded( ChannelHandlerContext ctx )
{
log = new PrefixedLogger( ctx.channel().toString(), logging, getClass() );
log = new ChannelActivityLogger( ctx.channel(), logging, getClass() );
}

@Override
Expand Down Expand Up @@ -112,7 +112,7 @@ public void exceptionCaught( ChannelHandlerContext ctx, Throwable error )
protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )
{
int serverSuggestedVersion = in.readInt();
log.debug( "Server suggested protocol version %s during handshake", serverSuggestedVersion );
log.debug( "S: [Bolt Handshake] %d", serverSuggestedVersion );

ChannelPipeline pipeline = ctx.pipeline();
// this is a one-time handler, remove it when protocol version has been read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,25 @@
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.v1.Logging;

import static org.neo4j.driver.internal.async.ChannelAttributes.setCreationTimestamp;
import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher;
import static org.neo4j.driver.internal.async.ChannelAttributes.setServerAddress;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;

public class NettyChannelInitializer extends ChannelInitializer<Channel>
{
private final BoltServerAddress address;
private final SecurityPlan securityPlan;
private final Clock clock;
private final Logging logging;

public NettyChannelInitializer( BoltServerAddress address, SecurityPlan securityPlan, Clock clock )
public NettyChannelInitializer( BoltServerAddress address, SecurityPlan securityPlan, Clock clock, Logging logging )
{
this.address = address;
this.securityPlan = securityPlan;
this.clock = clock;
this.logging = logging;
}

@Override
Expand Down Expand Up @@ -78,6 +80,6 @@ private void updateChannelAttributes( Channel channel )
{
setServerAddress( channel, address );
setCreationTimestamp( channel, clock.millis() );
setMessageDispatcher( channel, new InboundMessageDispatcher( channel, DEV_NULL_LOGGING ) );
setMessageDispatcher( channel, new InboundMessageDispatcher( channel, logging ) );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import java.io.IOException;

import org.neo4j.driver.internal.logging.PrefixedLogger;
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
import org.neo4j.driver.internal.util.ErrorUtil;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
Expand All @@ -50,7 +50,7 @@ public ChannelErrorHandler( Logging logging )
public void handlerAdded( ChannelHandlerContext ctx )
{
messageDispatcher = requireNonNull( messageDispatcher( ctx.channel() ) );
log = new PrefixedLogger( ctx.channel().toString(), logging, getClass() );
log = new ChannelActivityLogger( ctx.channel(), logging, getClass() );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
package org.neo4j.driver.internal.async.inbound;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import org.neo4j.driver.internal.logging.PrefixedLogger;
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;

import static io.netty.buffer.ByteBufUtil.prettyHexDump;

public class ChunkDecoder extends LengthFieldBasedFrameDecoder
{
private static final int MAX_FRAME_LENGTH = Short.MAX_VALUE;
Expand All @@ -48,7 +47,7 @@ public ChunkDecoder( Logging logging )
@Override
public void handlerAdded( ChannelHandlerContext ctx )
{
log = new PrefixedLogger( ctx.channel().toString(), logging, getClass() );
log = new ChannelActivityLogger( ctx.channel(), logging, getClass() );
}

@Override
Expand All @@ -65,8 +64,8 @@ protected ByteBuf extractFrame( ChannelHandlerContext ctx, ByteBuf buffer, int i
int originalReaderIndex = buffer.readerIndex();
int readerIndexWithChunkHeader = originalReaderIndex - INITIAL_BYTES_TO_STRIP;
int lengthWithChunkHeader = INITIAL_BYTES_TO_STRIP + buffer.readableBytes();
String hexDump = prettyHexDump( buffer, readerIndexWithChunkHeader, lengthWithChunkHeader );
log.trace( "S:\n%s", hexDump );
String hexDump = ByteBufUtil.hexDump( buffer, readerIndexWithChunkHeader, lengthWithChunkHeader );
log.trace( "S: %s", hexDump );
}
return super.extractFrame( ctx, buffer, index, length );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.Queue;

import org.neo4j.driver.internal.handlers.AckFailureResponseHandler;
import org.neo4j.driver.internal.logging.PrefixedLogger;
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
import org.neo4j.driver.internal.messaging.MessageHandler;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.internal.util.ErrorUtil;
Expand All @@ -50,7 +50,7 @@ public class InboundMessageDispatcher implements MessageHandler
public InboundMessageDispatcher( Channel channel, Logging logging )
{
this.channel = requireNonNull( channel );
this.log = new PrefixedLogger( channel.toString(), logging, getClass() );
this.log = new ChannelActivityLogger( channel, logging, getClass() );
}

public void queue( ResponseHandler handler )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.DecoderException;

import org.neo4j.driver.internal.logging.PrefixedLogger;
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
import org.neo4j.driver.internal.messaging.MessageFormat;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;

import static io.netty.buffer.ByteBufUtil.prettyHexDump;
import static io.netty.buffer.ByteBufUtil.hexDump;
import static java.util.Objects.requireNonNull;
import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher;

Expand All @@ -52,7 +52,7 @@ public InboundMessageHandler( MessageFormat messageFormat, Logging logging )
public void handlerAdded( ChannelHandlerContext ctx )
{
messageDispatcher = requireNonNull( messageDispatcher( ctx.channel() ) );
log = new PrefixedLogger( ctx.channel().toString(), logging, getClass() );
log = new ChannelActivityLogger( ctx.channel(), logging, getClass() );
}

@Override
Expand All @@ -68,13 +68,13 @@ protected void channelRead0( ChannelHandlerContext ctx, ByteBuf msg )
if ( messageDispatcher.fatalErrorOccurred() )
{
log.warn( "Message ignored because of the previous fatal error. Channel will be closed. Message:\n%s",
prettyHexDump( msg ) );
hexDump( msg ) );
return;
}

if ( log.isTraceEnabled() )
{
log.trace( "S:\n%s", prettyHexDump( msg ) );
log.trace( "S: %s", hexDump( msg ) );
}

input.start( msg );
Expand All @@ -84,7 +84,7 @@ protected void channelRead0( ChannelHandlerContext ctx, ByteBuf msg )
}
catch ( Throwable error )
{
throw new DecoderException( "Failed to read inbound message:\n" + prettyHexDump( msg ) + "\n", error );
throw new DecoderException( "Failed to read inbound message:\n" + hexDump( msg ) + "\n", error );
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.neo4j.driver.internal.packstream.PackOutput;

import static java.util.Objects.requireNonNull;
import static org.neo4j.driver.internal.async.ProtocolUtil.CHUNK_HEADER_SIZE_BYTES;
import static org.neo4j.driver.internal.async.ProtocolUtil.DEFAULT_MAX_OUTBOUND_CHUNK_SIZE_BYTES;
import static org.neo4j.driver.internal.async.ProtocolUtil.chunkHeaderPlaceholder;
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.CHUNK_HEADER_SIZE_BYTES;
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.DEFAULT_MAX_OUTBOUND_CHUNK_SIZE_BYTES;
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.chunkHeaderPlaceholder;

public class ChunkAwareByteBufOutput implements PackOutput
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@

import java.util.List;

import org.neo4j.driver.internal.logging.PrefixedLogger;
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
import org.neo4j.driver.internal.messaging.Message;
import org.neo4j.driver.internal.messaging.MessageFormat;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;

import static io.netty.buffer.ByteBufUtil.prettyHexDump;
import static org.neo4j.driver.internal.async.ProtocolUtil.messageBoundary;
import static io.netty.buffer.ByteBufUtil.hexDump;
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.messageBoundary;

public class OutboundMessageHandler extends MessageToMessageEncoder<Message>
{
Expand Down Expand Up @@ -61,7 +61,7 @@ private OutboundMessageHandler( MessageFormat messageFormat, boolean byteArraySu
@Override
public void handlerAdded( ChannelHandlerContext ctx )
{
log = new PrefixedLogger( ctx.channel().toString(), logging, getClass() );
log = new ChannelActivityLogger( ctx.channel(), logging, getClass() );
}

@Override
Expand Down Expand Up @@ -92,7 +92,7 @@ protected void encode( ChannelHandlerContext ctx, Message msg, List<Object> out

if ( log.isTraceEnabled() )
{
log.trace( "C:\n%s", prettyHexDump( messageBuf ) );
log.trace( "C: %s", hexDump( messageBuf ) );
}

out.add( messageBuf );
Expand Down
Loading

0 comments on commit 9765378

Please sign in to comment.