Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prohibit blocking operations in IO threads #434

Merged
merged 3 commits into from
Nov 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
{
InternalDriver driver = createDriver( uri, address, connectionPool, config, newRoutingSettings,
eventExecutorGroup, securityPlan, retryLogic );
Futures.getBlocking( driver.verifyConnectivity() );
Futures.blockingGet( driver.verifyConnectivity() );
return driver;
}
catch ( Throwable driverError )
{
// we need to close the connection pool if driver creation threw exception
try
{
Futures.getBlocking( connectionPool.close() );
Futures.blockingGet( connectionPool.close() );
}
catch ( Throwable closeError )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@

import static java.util.Collections.emptyMap;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.neo4j.driver.internal.util.Futures.blockingGet;
import static org.neo4j.driver.internal.util.Futures.completionErrorCause;
import static org.neo4j.driver.internal.util.Futures.failedFuture;
import static org.neo4j.driver.internal.util.Futures.getBlocking;
import static org.neo4j.driver.v1.Values.value;

public class ExplicitTransaction implements Transaction
Expand Down Expand Up @@ -150,7 +150,7 @@ public void failure()
@Override
public void close()
{
getBlocking( closeAsync() );
blockingGet( closeAsync() );
}

CompletionStage<Void> closeAsync()
Expand Down Expand Up @@ -274,7 +274,7 @@ public CompletionStage<StatementResultCursor> runAsync( String statementTemplate
@Override
public StatementResult run( Statement statement )
{
StatementResultCursor cursor = getBlocking( run( statement, false ) );
StatementResultCursor cursor = blockingGet( run( statement, false ) );
return new InternalStatementResult( cursor );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.neo4j.driver.v1.Session;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.neo4j.driver.internal.util.Futures.getBlocking;
import static org.neo4j.driver.internal.util.Futures.blockingGet;

public class InternalDriver implements Driver
{
Expand Down Expand Up @@ -104,7 +104,7 @@ private Session newSession( AccessMode mode, Bookmark bookmark )
@Override
public void close()
{
getBlocking( closeAsync() );
blockingGet( closeAsync() );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.neo4j.driver.v1.summary.ResultSummary;
import org.neo4j.driver.v1.util.Function;

import static org.neo4j.driver.internal.util.Futures.getBlocking;
import static org.neo4j.driver.internal.util.Futures.blockingGet;

public class InternalStatementResult implements StatementResult
{
Expand All @@ -45,7 +45,7 @@ public List<String> keys()
{
if ( keys == null )
{
getBlocking( cursor.peekAsync() );
blockingGet( cursor.peekAsync() );
keys = cursor.keys();
}
return keys;
Expand All @@ -54,13 +54,13 @@ public List<String> keys()
@Override
public boolean hasNext()
{
return getBlocking( cursor.peekAsync() ) != null;
return blockingGet( cursor.peekAsync() ) != null;
}

@Override
public Record next()
{
Record record = getBlocking( cursor.nextAsync() );
Record record = blockingGet( cursor.nextAsync() );
if ( record == null )
{
throw new NoSuchRecordException( "No more records" );
Expand All @@ -71,13 +71,13 @@ public Record next()
@Override
public Record single()
{
return getBlocking( cursor.singleAsync() );
return blockingGet( cursor.singleAsync() );
}

@Override
public Record peek()
{
Record record = getBlocking( cursor.peekAsync() );
Record record = blockingGet( cursor.peekAsync() );
if ( record == null )
{
throw new NoSuchRecordException( "Cannot peek past the last record" );
Expand All @@ -88,25 +88,25 @@ public Record peek()
@Override
public List<Record> list()
{
return getBlocking( cursor.listAsync() );
return blockingGet( cursor.listAsync() );
}

@Override
public <T> List<T> list( Function<Record, T> mapFunction )
{
return getBlocking( cursor.listAsync( mapFunction ) );
return blockingGet( cursor.listAsync( mapFunction ) );
}

@Override
public ResultSummary consume()
{
return getBlocking( cursor.consumeAsync() );
return blockingGet( cursor.consumeAsync() );
}

@Override
public ResultSummary summary()
{
return getBlocking( cursor.summaryAsync() );
return blockingGet( cursor.summaryAsync() );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected void finalize() throws Throwable

private void logLeakIfNeeded()
{
Boolean isOpen = Futures.getBlocking( currentConnectionIsOpen() );
Boolean isOpen = Futures.blockingGet( currentConnectionIsOpen() );
if ( isOpen )
{
logger.error( "Neo4j Session object leaked, please ensure that your application" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
import org.neo4j.driver.v1.types.TypeSystem;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.neo4j.driver.internal.util.Futures.blockingGet;
import static org.neo4j.driver.internal.util.Futures.failedFuture;
import static org.neo4j.driver.internal.util.Futures.getBlocking;
import static org.neo4j.driver.v1.Values.value;

public class NetworkSession implements Session
Expand Down Expand Up @@ -132,7 +132,7 @@ public CompletionStage<StatementResultCursor> runAsync( String statementText, Va
@Override
public StatementResult run( Statement statement )
{
StatementResultCursor cursor = getBlocking( runAsync( statement, false ) );
StatementResultCursor cursor = blockingGet( runAsync( statement, false ) );
return new InternalStatementResult( cursor );
}

Expand All @@ -152,7 +152,7 @@ public boolean isOpen()
@Override
public void close()
{
getBlocking( closeAsync() );
blockingGet( closeAsync() );
}

@Override
Expand Down Expand Up @@ -189,7 +189,7 @@ public CompletionStage<Void> closeAsync()
@Override
public Transaction beginTransaction()
{
return getBlocking( beginTransactionAsync( mode ) );
return blockingGet( beginTransactionAsync( mode ) );
}

@Deprecated
Expand Down Expand Up @@ -248,7 +248,7 @@ public String lastBookmark()
@Override
public void reset()
{
getBlocking( resetAsync() );
blockingGet( resetAsync() );
}

private CompletionStage<Void> resetAsync()
Expand Down Expand Up @@ -288,7 +288,7 @@ private <T> T transaction( AccessMode mode, TransactionWork<T> work )
// event loop thread will bock and wait for itself to read some data
return retryLogic.retry( () ->
{
try ( Transaction tx = getBlocking( beginTransactionAsync( mode ) ) )
try ( Transaction tx = blockingGet( beginTransactionAsync( mode ) ) )
{
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.EventLoopGroup;

public final class BootstrapFactory
{
Expand All @@ -31,19 +30,19 @@ private BootstrapFactory()

public static Bootstrap newBootstrap()
{
return newBootstrap( new NioEventLoopGroup() );
return newBootstrap( EventLoopGroupFactory.newEventLoopGroup() );
}

public static Bootstrap newBootstrap( int threadCount )
{
return newBootstrap( new NioEventLoopGroup( threadCount ) );
return newBootstrap( EventLoopGroupFactory.newEventLoopGroup( threadCount ) );
}

private static Bootstrap newBootstrap( NioEventLoopGroup eventLoopGroup )
private static Bootstrap newBootstrap( EventLoopGroup eventLoopGroup )
{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group( eventLoopGroup );
bootstrap.channel( NioSocketChannel.class );
bootstrap.channel( EventLoopGroupFactory.channelClass() );
bootstrap.option( ChannelOption.SO_KEEPALIVE, true );
bootstrap.option( ChannelOption.SO_REUSEADDR, true );
return bootstrap;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.async;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.FastThreadLocalThread;

import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;

import org.neo4j.driver.v1.Session;

/**
* Manages creation of Netty {@link EventLoopGroup}s, which are basically {@link Executor}s that perform IO operations.
*/
public final class EventLoopGroupFactory
{
private static final String THREAD_NAME_PREFIX = "Neo4jDriverIO";
private static final int THREAD_PRIORITY = Thread.MAX_PRIORITY;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should stick with Thread.NORM_PRIORITY. Maybe we can depend on a system property to override the default, if such a requirement arises.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MAX_PRIORITY is what netty uses inside. Here I'm basically duplicating default behaviour just to create threads of our own class. Such thread can then be easily detected with simple instanceof.


private EventLoopGroupFactory()
{
}

/**
* Get class of {@link Channel} for {@link Bootstrap#channel(Class)} method.
*
* @return class of the channel, which should be consistent with {@link EventLoopGroup}s returned by
* {@link #newEventLoopGroup()} and {@link #newEventLoopGroup(int)}.
*/
public static Class<? extends Channel> channelClass()
{
return NioSocketChannel.class;
}

/**
* Create new {@link EventLoopGroup} with specified thread count. Returned group should by given to
* {@link Bootstrap#group(EventLoopGroup)}.
*
* @param threadCount amount of IO threads for the new group.
* @return new group consistent with channel class returned by {@link #channelClass()}.
*/
public static EventLoopGroup newEventLoopGroup( int threadCount )
{
return new DriverEventLoopGroup( threadCount );
}

/**
* Create new {@link EventLoopGroup} with default thread count. Returned group should by given to
* {@link Bootstrap#group(EventLoopGroup)}.
*
* @return new group consistent with channel class returned by {@link #channelClass()}.
*/
public static EventLoopGroup newEventLoopGroup()
{
return new DriverEventLoopGroup();
}

/**
* Assert that current thread is not an event loop used for async IO operations. This check is needed because
* blocking API methods like {@link Session#run(String)} are implemented on top of corresponding async API methods
* like {@link Session#runAsync(String)} using basically {@link Future#get()} calls. Deadlocks might happen when IO
* thread executes blocking API call and has to wait for itself to read from the network.
*
* @throws IllegalStateException when current thread is an event loop IO thread.
*/
public static void assertNotInEventLoopThread() throws IllegalStateException
{
if ( Thread.currentThread() instanceof DriverThread )
{
throw new IllegalStateException(
"Blocking operation can't be executed in IO thread because it might result in a deadlock. " +
"Please do not use blocking API when chaining futures returned by async API methods." );
}
}

/**
* Same as {@link NioEventLoopGroup} but uses a different {@link ThreadFactory} that produces threads of
* {@link DriverThread} class. Such threads can be recognized by {@link #assertNotInEventLoopThread()}.
*/
private static class DriverEventLoopGroup extends NioEventLoopGroup
{
DriverEventLoopGroup()
{
}

DriverEventLoopGroup( int nThreads )
{
super( nThreads );
}

@Override
protected ThreadFactory newDefaultThreadFactory()
{
return new DriverThreadFactory();
}
}

/**
* Same as {@link DefaultThreadFactory} created by {@link NioEventLoopGroup} by default, except produces threads of
* {@link DriverThread} class. Such threads can be recognized by {@link #assertNotInEventLoopThread()}.
*/
private static class DriverThreadFactory extends DefaultThreadFactory
{
DriverThreadFactory()
{
super( THREAD_NAME_PREFIX, THREAD_PRIORITY );
}

@Override
protected Thread newThread( Runnable r, String name )
{
return new DriverThread( threadGroup, r, name );
}
}

/**
* Same as default thread created by {@link DefaultThreadFactory} except this dedicated class can be easily
* recognized by {@link #assertNotInEventLoopThread()}.
*/
private static class DriverThread extends FastThreadLocalThread
{
DriverThread( ThreadGroup group, Runnable target, String name )
{
super( group, target, name );
}
}
}
Loading