From 321c127a0ff19ff3a28376c2779a2e3f087e5016 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Mon, 18 Jul 2016 10:10:16 -0700 Subject: [PATCH 1/5] core: Reduce DeadlineTest flake To my knowledge, there has been just a single DeadlineTest flake since the code was fixed to avoid issues with I/O due to class loading: io.grpc.DeadlineTest > defaultTickerIsSystemTicker[0] FAILED java.lang.AssertionError: <-21431071 ns from now> and <0 ns from now> should have been within <20000000ns> of each other But we don't really need fine-grained verification during the test though; if the code is not using nanoTime, then it is almost certainly not going to have even a day of accuracy (except on a fresh VM). So checking for a second of accuracy vs 20ms shouldn't really be an issue. --- core/src/test/java/io/grpc/DeadlineTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/io/grpc/DeadlineTest.java b/core/src/test/java/io/grpc/DeadlineTest.java index 658c9019e7f..2df9959d32e 100644 --- a/core/src/test/java/io/grpc/DeadlineTest.java +++ b/core/src/test/java/io/grpc/DeadlineTest.java @@ -81,7 +81,7 @@ public void defaultTickerIsSystemTicker() { ticker.reset(System.nanoTime()); Deadline reference = Deadline.after(0, TimeUnit.SECONDS, ticker); // Allow inaccuracy to account for system time advancing during test. - assertAbout(deadline()).that(d).isWithin(20, TimeUnit.MILLISECONDS).of(reference); + assertAbout(deadline()).that(d).isWithin(1, TimeUnit.SECONDS).of(reference); } @Test From c14edca1b88f75703ad1b239d1485587824798ea Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Sun, 31 Jul 2016 19:15:40 -0700 Subject: [PATCH 2/5] inprocess: Avoid creating unnecessary threads Implementations of ManagedClientTransport.start() are restricted from calling the passed listener until start() returns, in order to avoid reentrency problems with locks. For most transports this isn't a problem, because they need additional threads anyway. InProcess uses no additional threads naturally so ends up needing a thread just to notifyReady. Now transports can just return a Runnable that can be run after locks are dropped. This was originally intended to be a performance optimization, but the thread also causes nondeterminism because RPCs are delayed until notifyReady is called. So avoiding the thread reduces needless fakes during tests. --- .../io/grpc/inprocess/InProcessTransport.java | 19 +++---- .../grpc/internal/DelayedClientTransport.java | 3 +- .../ForwardingConnectionClientTransport.java | 4 +- .../grpc/internal/ManagedClientTransport.java | 12 +++-- .../java/io/grpc/internal/TransportSet.java | 41 ++++++++------ .../io/grpc/netty/NettyClientTransport.java | 3 +- .../io/grpc/okhttp/OkHttpClientTransport.java | 3 +- .../testing/AbstractTransportTest.java | 54 ++++++++++--------- 8 files changed, 80 insertions(+), 59 deletions(-) diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index e2181766af5..5c085c20c29 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -62,6 +62,7 @@ import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.CheckReturnValue; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; @@ -89,8 +90,9 @@ public InProcessTransport(String name) { .build(); } + @CheckReturnValue @Override - public synchronized void start(ManagedClientTransport.Listener listener) { + public synchronized Runnable start(ManagedClientTransport.Listener listener) { this.clientTransportListener = listener; InProcessServer server = InProcessServer.findServer(name); if (server != null) { @@ -99,7 +101,7 @@ public synchronized void start(ManagedClientTransport.Listener listener) { if (serverTransportListener == null) { shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + name); final Status localShutdownStatus = shutdownStatus; - Thread shutdownThread = new Thread(new Runnable() { + return new Runnable() { @Override public void run() { synchronized (InProcessTransport.this) { @@ -107,23 +109,16 @@ public void run() { notifyTerminated(); } } - }); - shutdownThread.setDaemon(true); - shutdownThread.setName("grpc-inprocess-shutdown"); - shutdownThread.start(); - return; + }; } - Thread readyThread = new Thread(new Runnable() { + return new Runnable() { @Override public void run() { synchronized (InProcessTransport.this) { clientTransportListener.transportReady(); } } - }); - readyThread.setDaemon(true); - readyThread.setName("grpc-inprocess-ready"); - readyThread.start(); + }; } @Override diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 7a0e157e667..4e28bcec4ad 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -95,8 +95,9 @@ class DelayedClientTransport implements ManagedClientTransport { } @Override - public void start(Listener listener) { + public Runnable start(Listener listener) { this.listener = Preconditions.checkNotNull(listener, "listener"); + return null; } /** diff --git a/core/src/main/java/io/grpc/internal/ForwardingConnectionClientTransport.java b/core/src/main/java/io/grpc/internal/ForwardingConnectionClientTransport.java index 9f973c21528..b79feb6d71f 100644 --- a/core/src/main/java/io/grpc/internal/ForwardingConnectionClientTransport.java +++ b/core/src/main/java/io/grpc/internal/ForwardingConnectionClientTransport.java @@ -41,8 +41,8 @@ abstract class ForwardingConnectionClientTransport implements ConnectionClientTransport { @Override - public void start(Listener listener) { - delegate().start(listener); + public Runnable start(Listener listener) { + return delegate().start(listener); } @Override diff --git a/core/src/main/java/io/grpc/internal/ManagedClientTransport.java b/core/src/main/java/io/grpc/internal/ManagedClientTransport.java index a1fbf97bf48..cbe2c088c9c 100644 --- a/core/src/main/java/io/grpc/internal/ManagedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/ManagedClientTransport.java @@ -33,6 +33,8 @@ import io.grpc.Status; +import javax.annotation.CheckReturnValue; +import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; /** @@ -53,12 +55,16 @@ public interface ManagedClientTransport extends ClientTransport, WithLogId { * Starts transport. This method may only be called once. * *

Implementations must not call {@code listener} from within {@link #start}; implementations - * are expected to notify listener on a separate thread. This method should not throw any - * exceptions. + * are expected to notify listener on a separate thread or when the returned {@link Runnable} is + * run. This method and the returned {@code Runnable} should not throw any exceptions. * * @param listener non-{@code null} listener of transport events + * @return a {@link Runnable} that is executed after-the-fact by the original caller, typically + * after locks are released */ - void start(Listener listener); + @CheckReturnValue + @Nullable + Runnable start(Listener listener); /** * Initiates an orderly shutdown of the transport. Existing streams continue, but the transport diff --git a/core/src/main/java/io/grpc/internal/TransportSet.java b/core/src/main/java/io/grpc/internal/TransportSet.java index 2bb3405ea18..025b83276fc 100644 --- a/core/src/main/java/io/grpc/internal/TransportSet.java +++ b/core/src/main/java/io/grpc/internal/TransportSet.java @@ -174,25 +174,32 @@ final ClientTransport obtainActiveTransport() { if (savedTransport != null) { return savedTransport; } + Runnable runnable; synchronized (lock) { // Check again, since it could have changed before acquiring the lock - if (activeTransport == null) { - if (shutdown) { - return SHUTDOWN_TRANSPORT; - } - // Transition to CONNECTING - DelayedClientTransport delayedTransport = new DelayedClientTransport(appExecutor); - transports.add(delayedTransport); - delayedTransport.start(new BaseTransportListener(delayedTransport)); - activeTransport = delayedTransport; - startNewTransport(delayedTransport); + savedTransport = activeTransport; + if (savedTransport != null) { + return savedTransport; + } + if (shutdown) { + return SHUTDOWN_TRANSPORT; } - return activeTransport; + // Transition to CONNECTING + DelayedClientTransport delayedTransport = new DelayedClientTransport(appExecutor); + transports.add(delayedTransport); + delayedTransport.start(new BaseTransportListener(delayedTransport)); + savedTransport = activeTransport = delayedTransport; + runnable = startNewTransport(delayedTransport); } + if (runnable != null) { + runnable.run(); + } + return savedTransport; } + @CheckReturnValue @GuardedBy("lock") - private void startNewTransport(DelayedClientTransport delayedTransport) { + private Runnable startNewTransport(DelayedClientTransport delayedTransport) { Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled"); if (nextAddressIndex == 0) { @@ -212,7 +219,7 @@ private void startNewTransport(DelayedClientTransport delayedTransport) { } pendingTransport = transport; transports.add(transport); - transport.start(new TransportListener(transport, delayedTransport, address)); + return transport.start(new TransportListener(transport, delayedTransport, address)); } /** @@ -241,17 +248,21 @@ public void run() { try { delayedTransport.endBackoff(); boolean shutdownDelayedTransport = false; + Runnable runnable = null; synchronized (lock) { reconnectTask = null; if (delayedTransport.hasPendingStreams()) { // Transition directly to CONNECTING - startNewTransport(delayedTransport); + runnable = startNewTransport(delayedTransport); } else { // Transition to IDLE (or already SHUTDOWN) activeTransport = null; shutdownDelayedTransport = true; } } + if (runnable != null) { + runnable.run(); + } if (shutdownDelayedTransport) { delayedTransport.setTransportSupplier(new Supplier() { @Override @@ -453,7 +464,7 @@ public void transportShutdown(Status s) { runnable = scheduleBackoff(delayedTransport, s); } else { // Still CONNECTING - startNewTransport(delayedTransport); + runnable = startNewTransport(delayedTransport); } } } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 0d201ccf13a..94c92c9b78a 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -133,7 +133,7 @@ public ClientStream newStream(MethodDescriptor method, Metadata headers) { } @Override - public void start(Listener transportListener) { + public Runnable start(Listener transportListener) { lifecycleManager = new ClientTransportLifecycleManager( Preconditions.checkNotNull(transportListener, "listener")); @@ -191,6 +191,7 @@ public void operationComplete(ChannelFuture future) throws Exception { Status.INTERNAL.withDescription("Connection closed with unknown cause")); } }); + return null; } @Override diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index bfd8d240706..248a696af3a 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -344,7 +344,7 @@ void removePendingStream(OkHttpClientStream pendingStream) { } @Override - public void start(Listener listener) { + public Runnable start(Listener listener) { this.listener = Preconditions.checkNotNull(listener, "listener"); if (enableKeepAlive) { @@ -433,6 +433,7 @@ public void close() {} } } }); + return null; } @Override diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java index b185fde0fa2..41732e878b5 100644 --- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java +++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java @@ -182,7 +182,7 @@ public void tearDown() throws InterruptedException { public void frameAfterRstStreamShouldNotBreakClientChannel() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -232,7 +232,7 @@ public void serverNotListening() throws Exception { server = null; InOrder inOrder = inOrder(mockClientTransportListener); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); inOrder.verify(mockClientTransportListener).transportShutdown(statusCaptor.capture()); assertCodeEquals(Status.UNAVAILABLE, statusCaptor.getValue()); @@ -246,7 +246,7 @@ public void clientStartStop() throws Exception { server.start(serverListener); client = newClientTransport(server); InOrder inOrder = inOrder(mockClientTransportListener); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); client.shutdown(); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); inOrder.verify(mockClientTransportListener).transportShutdown(statusCaptor.capture()); @@ -260,7 +260,7 @@ public void clientStartAndStopOnceConnected() throws Exception { server.start(serverListener); client = newClientTransport(server); InOrder inOrder = inOrder(mockClientTransportListener); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); @@ -288,7 +288,7 @@ public void serverAlreadyListening() throws Exception { public void openStreamPreventsTermination() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -339,7 +339,7 @@ public void openStreamPreventsTermination() throws Exception { public void shutdownNowKillsClientStream() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -370,7 +370,7 @@ public void shutdownNowKillsClientStream() throws Exception { public void shutdownNowKillsServerStream() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -403,7 +403,7 @@ public void shutdownNowKillsServerStream() throws Exception { public void ping() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); ClientTransport.PingCallback mockPingCallback = mock(ClientTransport.PingCallback.class); try { client.ping(mockPingCallback, MoreExecutors.directExecutor()); @@ -419,7 +419,7 @@ public void ping() throws Exception { public void ping_duringShutdown() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); // Stream prevents termination ClientStream stream = client.newStream(methodDescriptor, new Metadata()); stream.start(mockClientStreamListener); @@ -440,7 +440,7 @@ public void ping_duringShutdown() throws Exception { public void ping_afterTermination() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); client.shutdown(); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); @@ -466,7 +466,7 @@ public void ping_afterTermination() throws Exception { public void newStream_duringShutdown() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); // Stream prevents termination ClientStream stream = client.newStream(methodDescriptor, new Metadata()); stream.start(mockClientStreamListener); @@ -497,7 +497,7 @@ public void newStream_afterTermination() throws Exception { // dealing with afterTermination is harder than duringShutdown. server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); client.shutdown(); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); @@ -514,7 +514,7 @@ public void newStream_afterTermination() throws Exception { public void transportInUse_normalClose() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); ClientStream stream1 = client.newStream(methodDescriptor, new Metadata()); stream1.start(mockClientStreamListener); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true); @@ -542,7 +542,7 @@ public void transportInUse_normalClose() throws Exception { public void transportInUse_clientCancel() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); ClientStream stream1 = client.newStream(methodDescriptor, new Metadata()); stream1.start(mockClientStreamListener); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true); @@ -562,7 +562,7 @@ public void transportInUse_clientCancel() throws Exception { public void basicStream() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -640,7 +640,7 @@ public void basicStream() throws Exception { public void zeroMessageStream() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -672,7 +672,7 @@ public void zeroMessageStream() throws Exception { public void earlyServerClose_withServerHeaders() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -702,7 +702,7 @@ public void earlyServerClose_withServerHeaders() throws Exception { public void earlyServerClose_noServerHeaders() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -739,7 +739,7 @@ public void earlyServerClose_noServerHeaders() throws Exception { public void earlyServerClose_serverFailure() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -767,7 +767,7 @@ public void earlyServerClose_serverFailure() throws Exception { public void clientCancel() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -798,7 +798,7 @@ public void clientCancel() throws Exception { public void clientCancelFromWithinMessageRead() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -852,7 +852,7 @@ public void onReady() { public void serverCancel() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -889,7 +889,7 @@ public void serverCancel() throws Exception { public void flowControlPushBack() throws Exception { server.start(serverListener); client = newClientTransport(server); - client.start(mockClientTransportListener); + runIfNotNull(client.start(mockClientTransportListener)); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -1052,7 +1052,7 @@ public Void answer(InvocationOnMock invocation) throws Exception { */ private void doPingPong(MockServerListener serverListener) throws InterruptedException { ManagedClientTransport client = newClientTransport(server); - client.start(mock(ManagedClientTransport.Listener.class)); + runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class))); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata()); ClientStreamListener mockClientStreamListener = mock(ClientStreamListener.class); clientStream.start(mockClientStreamListener); @@ -1100,6 +1100,12 @@ private static boolean waitForFuture(Future future, long timeout, TimeUnit un return true; } + private static void runIfNotNull(Runnable runnable) { + if (runnable != null) { + runnable.run(); + } + } + private static class MockServerListener implements ServerListener { public final BlockingQueue listeners = new LinkedBlockingQueue(); From 22393f913377cd09ca10b6535f90866ff00b3236 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Thu, 4 Aug 2016 11:16:39 -0700 Subject: [PATCH 3/5] core: Avoid wrapping Errors in RuntimeException --- .../io/grpc/internal/LogExceptionRunnable.java | 5 ++++- .../main/java/io/grpc/internal/ServerImpl.java | 18 +++++++++--------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/LogExceptionRunnable.java b/core/src/main/java/io/grpc/internal/LogExceptionRunnable.java index 359a35f9c54..2b0e5b2a050 100644 --- a/core/src/main/java/io/grpc/internal/LogExceptionRunnable.java +++ b/core/src/main/java/io/grpc/internal/LogExceptionRunnable.java @@ -33,6 +33,8 @@ import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.base.Throwables; + import java.util.logging.Level; import java.util.logging.Logger; @@ -56,7 +58,8 @@ public void run() { task.run(); } catch (Throwable t) { log.log(Level.SEVERE, "Exception while executing runnable " + task, t); - throw t instanceof RuntimeException ? (RuntimeException) t : new RuntimeException(t); + Throwables.propagateIfPossible(t); + throw new AssertionError(t); } } diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index edf11bc69af..985987fa7fb 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -363,10 +363,10 @@ public void runInContext() { stream.close(Status.fromThrowable(e), new Metadata()); context.cancel(null); throw e; - } catch (Throwable t) { - stream.close(Status.fromThrowable(t), new Metadata()); + } catch (Error e) { + stream.close(Status.fromThrowable(e), new Metadata()); context.cancel(null); - throw new RuntimeException(t); + throw e; } finally { jumpListener.setListener(listener); } @@ -486,9 +486,9 @@ public void runInContext() { } catch (RuntimeException e) { internalClose(Status.fromThrowable(e), new Metadata()); throw e; - } catch (Throwable t) { - internalClose(Status.fromThrowable(t), new Metadata()); - throw new RuntimeException(t); + } catch (Error e) { + internalClose(Status.fromThrowable(e), new Metadata()); + throw e; } } }); @@ -504,9 +504,9 @@ public void runInContext() { } catch (RuntimeException e) { internalClose(Status.fromThrowable(e), new Metadata()); throw e; - } catch (Throwable t) { - internalClose(Status.fromThrowable(t), new Metadata()); - throw new RuntimeException(t); + } catch (Error e) { + internalClose(Status.fromThrowable(e), new Metadata()); + throw e; } } }); From 29fefe2a3fadce448eac56a3c098161283062d44 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Thu, 1 Sep 2016 16:00:43 -0700 Subject: [PATCH 4/5] core: split Context into a separate grpc-context artifact. The Context API is not particularly gRPC-specific, and will be used by Census as its context propagation mechanism. Removed all dependencies to make it easy for other libraries to depend on. Manually resolved conflicts: context/src/main/java/io/grpc/Context.java context/src/main/java/io/grpc/Deadline.java core/build.gradle --- all/build.gradle | 1 + context/build.gradle | 14 +++++ .../src/main/java/io/grpc/Context.java | 51 +++++++++++------- .../src/main/java/io/grpc/Deadline.java | 53 +++++++++++++++---- .../src/test/java/io/grpc/ContextTest.java | 0 .../src/test/java/io/grpc/DeadlineTest.java | 2 +- core/build.gradle | 3 +- .../test/java/io/grpc/CallOptionsTest.java | 23 +++++++- settings.gradle | 2 + 9 files changed, 118 insertions(+), 31 deletions(-) create mode 100644 context/build.gradle rename {core => context}/src/main/java/io/grpc/Context.java (96%) rename {core => context}/src/main/java/io/grpc/Deadline.java (83%) rename {core => context}/src/test/java/io/grpc/ContextTest.java (100%) rename {core => context}/src/test/java/io/grpc/DeadlineTest.java (99%) diff --git a/all/build.gradle b/all/build.gradle index a84d9eeda5a..dd32d9cce20 100644 --- a/all/build.gradle +++ b/all/build.gradle @@ -14,6 +14,7 @@ buildscript { def subprojects = [ project(':grpc-auth'), project(':grpc-core'), + project(':grpc-context'), project(':grpc-netty'), project(':grpc-okhttp'), project(':grpc-protobuf'), diff --git a/context/build.gradle b/context/build.gradle new file mode 100644 index 00000000000..0566f6a0f31 --- /dev/null +++ b/context/build.gradle @@ -0,0 +1,14 @@ +plugins { + id "be.insaneprogramming.gradle.animalsniffer" version "1.4.0" +} + +description = 'gRPC: Context' + +dependencies { + testCompile project(':grpc-testing') +} + +// Configure the animal sniffer plugin +animalsniffer { + signature = "org.codehaus.mojo.signature:java16:+@signature" +} diff --git a/core/src/main/java/io/grpc/Context.java b/context/src/main/java/io/grpc/Context.java similarity index 96% rename from core/src/main/java/io/grpc/Context.java rename to context/src/main/java/io/grpc/Context.java index 810396f91e5..60e060d26ca 100644 --- a/core/src/main/java/io/grpc/Context.java +++ b/context/src/main/java/io/grpc/Context.java @@ -31,9 +31,6 @@ package io.grpc; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.MoreExecutors; - import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.Executor; @@ -44,8 +41,6 @@ import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.Nullable; - /** * A context propagation mechanism which can carry scoped-values across API boundaries and between * threads. Examples of state propagated via context include: @@ -275,8 +270,8 @@ public CancellableContext withDeadlineAfter(long duration, TimeUnit unit, */ public CancellableContext withDeadline(Deadline deadline, ScheduledExecutorService scheduler) { - Preconditions.checkNotNull(deadline, "deadline"); - Preconditions.checkNotNull(scheduler, "scheduler"); + checkNotNull(deadline, "deadline"); + checkNotNull(scheduler, "scheduler"); return new CancellableContext(this, deadline, scheduler); } @@ -349,7 +344,7 @@ public Context attach() { * will still be bound. */ public void detach(Context toAttach) { - Preconditions.checkNotNull(toAttach); + checkNotNull(toAttach, "toAttach"); if (toAttach.attach() != this) { // Log a severe message instead of throwing an exception as the context to attach is assumed // to be the correct one and the unbalanced state represents a coding mistake in a lower @@ -383,7 +378,6 @@ public boolean isCancelled() { *

The cancellation cause is provided for informational purposes only and implementations * should generally assume that it has already been handled and logged properly. */ - @Nullable public Throwable cancellationCause() { if (parent == null || !cascadesCancellation) { return null; @@ -396,7 +390,6 @@ public Throwable cancellationCause() { * A context may have an associated {@link Deadline} at which it will be automatically cancelled. * @return A {@link io.grpc.Deadline} or {@code null} if no deadline is set. */ - @Nullable public Deadline getDeadline() { return DEADLINE_KEY.get(this); } @@ -406,8 +399,8 @@ public Deadline getDeadline() { */ public void addListener(final CancellationListener cancellationListener, final Executor executor) { - Preconditions.checkNotNull(cancellationListener); - Preconditions.checkNotNull(executor); + checkNotNull(cancellationListener, "cancellationListener"); + checkNotNull(executor, "executor"); if (canBeCancelled) { ExecutableListener executableListener = new ExecutableListener(executor, cancellationListener); @@ -420,7 +413,7 @@ public void addListener(final CancellationListener cancellationListener, // we can cascade listener notification. listeners = new ArrayList(); listeners.add(executableListener); - parent.addListener(parentListener, MoreExecutors.directExecutor()); + parent.addListener(parentListener, DirectExecutor.INSTANCE); } else { listeners.add(executableListener); } @@ -685,13 +678,13 @@ public boolean isCurrent() { } /** - * Cancel this context and optionally provide a cause for the cancellation. This - * will trigger notification of listeners. + * Cancel this context and optionally provide a cause (can be {@code null}) for the + * cancellation. This will trigger notification of listeners. * * @return {@code true} if this context cancelled the context and notified listeners, * {@code false} if the context was already cancelled. */ - public boolean cancel(@Nullable Throwable cause) { + public boolean cancel(Throwable cause) { boolean triggeredCancel = false; synchronized (this) { if (!cancelled) { @@ -717,7 +710,7 @@ public boolean cancel(@Nullable Throwable cause) { * @param toAttach context to make current. * @param cause of cancellation, can be {@code null}. */ - public void detachAndCancel(Context toAttach, @Nullable Throwable cause) { + public void detachAndCancel(Context toAttach, Throwable cause) { try { detach(toAttach); } finally { @@ -741,7 +734,6 @@ public boolean isCancelled() { return false; } - @Nullable @Override public Throwable cancellationCause() { if (isCancelled()) { @@ -774,7 +766,7 @@ public static class Key { } Key(String name, T defaultValue) { - this.name = Preconditions.checkNotNull(name); + this.name = checkNotNull(name, "name"); this.defaultValue = defaultValue; } @@ -838,4 +830,25 @@ public void cancelled(Context context) { } } } + + private static T checkNotNull(T reference, Object errorMessage) { + if (reference == null) { + throw new NullPointerException(String.valueOf(errorMessage)); + } + return reference; + } + + private enum DirectExecutor implements Executor { + INSTANCE; + + @Override + public void execute(Runnable command) { + command.run(); + } + + @Override + public String toString() { + return "Context.DirectExecutor"; + } + } } diff --git a/core/src/main/java/io/grpc/Deadline.java b/context/src/main/java/io/grpc/Deadline.java similarity index 83% rename from core/src/main/java/io/grpc/Deadline.java rename to context/src/main/java/io/grpc/Deadline.java index d23a8cf509b..3514ddf1a96 100644 --- a/core/src/main/java/io/grpc/Deadline.java +++ b/context/src/main/java/io/grpc/Deadline.java @@ -31,14 +31,11 @@ package io.grpc; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -import io.grpc.internal.LogExceptionRunnable; - import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; /** * An absolute deadline in system time. @@ -60,9 +57,9 @@ public static Deadline after(long duration, TimeUnit units) { return after(duration, units, SYSTEM_TICKER); } - @VisibleForTesting + // For testing static Deadline after(long duration, TimeUnit units, Ticker ticker) { - Preconditions.checkNotNull(units); + checkNotNull(units, "units"); return new Deadline(ticker, units.toNanos(duration), true); } @@ -148,8 +145,8 @@ public long timeRemaining(TimeUnit unit) { * @return {@link ScheduledFuture} which can be used to cancel execution of the task */ public ScheduledFuture runOnExpiration(Runnable task, ScheduledExecutorService scheduler) { - Preconditions.checkNotNull(task, "task"); - Preconditions.checkNotNull(scheduler, "scheduler"); + checkNotNull(task, "task"); + checkNotNull(scheduler, "scheduler"); return scheduler.schedule(new LogExceptionRunnable(task), deadlineNanos - ticker.read(), TimeUnit.NANOSECONDS); } @@ -182,4 +179,42 @@ public long read() { return System.nanoTime(); } } + + private static T checkNotNull(T reference, Object errorMessage) { + if (reference == null) { + throw new NullPointerException(String.valueOf(errorMessage)); + } + return reference; + } + + private static class LogExceptionRunnable implements Runnable { + private static final Logger log = Logger.getLogger(LogExceptionRunnable.class.getName()); + + private final Runnable task; + + public LogExceptionRunnable(Runnable task) { + this.task = checkNotNull(task, "task"); + } + + @Override + public void run() { + try { + task.run(); + } catch (Throwable t) { + log.log(Level.SEVERE, "Exception while executing runnable " + task, t); + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } else { + throw new RuntimeException(t); + } + } + } + + @Override + public String toString() { + return "Deadline.LogExceptionRunnable(" + task + ")"; + } + } } diff --git a/core/src/test/java/io/grpc/ContextTest.java b/context/src/test/java/io/grpc/ContextTest.java similarity index 100% rename from core/src/test/java/io/grpc/ContextTest.java rename to context/src/test/java/io/grpc/ContextTest.java diff --git a/core/src/test/java/io/grpc/DeadlineTest.java b/context/src/test/java/io/grpc/DeadlineTest.java similarity index 99% rename from core/src/test/java/io/grpc/DeadlineTest.java rename to context/src/test/java/io/grpc/DeadlineTest.java index 2df9959d32e..f85edf18cd3 100644 --- a/core/src/test/java/io/grpc/DeadlineTest.java +++ b/context/src/test/java/io/grpc/DeadlineTest.java @@ -265,7 +265,7 @@ public void toString_before() { assertEquals("12000 ns from now", d.toString()); } - static class FakeTicker extends Deadline.Ticker { + private static class FakeTicker extends Deadline.Ticker { private long time; @Override diff --git a/core/build.gradle b/core/build.gradle index f7ccc82a75c..d851e38f58e 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -6,7 +6,8 @@ description = 'gRPC: Core' dependencies { compile libraries.guava, - libraries.jsr305 + libraries.jsr305, + project(':grpc-context') testCompile project(':grpc-testing') } diff --git a/core/src/test/java/io/grpc/CallOptionsTest.java b/core/src/test/java/io/grpc/CallOptionsTest.java index 40b46356c4e..c295d8088c4 100644 --- a/core/src/test/java/io/grpc/CallOptionsTest.java +++ b/core/src/test/java/io/grpc/CallOptionsTest.java @@ -51,13 +51,14 @@ import org.junit.runners.JUnit4; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; /** Unit tests for {@link CallOptions}. */ @RunWith(JUnit4.class) public class CallOptionsTest { private String sampleAuthority = "authority"; private String sampleCompressor = "compressor"; - private Deadline.Ticker ticker = new DeadlineTest.FakeTicker(); + private Deadline.Ticker ticker = new FakeTicker(); private Deadline sampleDeadline = Deadline.after(1, NANOSECONDS, ticker); private Key sampleKey = Attributes.Key.of("sample"); private Attributes sampleAffinity = Attributes.newBuilder().set(sampleKey, "blah").build(); @@ -233,4 +234,24 @@ private static boolean equal(CallOptions o1, CallOptions o2) { && Objects.equal(o1.getAffinity(), o2.getAffinity()) && Objects.equal(o1.getCredentials(), o2.getCredentials()); } + + private static class FakeTicker extends Deadline.Ticker { + private long time; + + @Override + public long read() { + return time; + } + + public void reset(long time) { + this.time = time; + } + + public void increment(long period, TimeUnit unit) { + if (period < 0) { + throw new IllegalArgumentException(); + } + this.time += unit.toNanos(period); + } + } } diff --git a/settings.gradle b/settings.gradle index 42496c2eeb3..b991a930b60 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,5 +1,6 @@ rootProject.name = "grpc" include ":grpc-core" +include ":grpc-context" include ":grpc-stub" include ":grpc-auth" include ":grpc-okhttp" @@ -15,6 +16,7 @@ include ":grpc-benchmarks" include ":grpc-services" project(':grpc-core').projectDir = "$rootDir/core" as File +project(':grpc-context').projectDir = "$rootDir/context" as File project(':grpc-stub').projectDir = "$rootDir/stub" as File project(':grpc-auth').projectDir = "$rootDir/auth" as File project(':grpc-okhttp').projectDir = "$rootDir/okhttp" as File From 75863cdbe6f76b896fad40b6fee5361c1f240965 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Mon, 12 Sep 2016 09:58:46 -0700 Subject: [PATCH 5/5] Fix a deadlock in TransportSet. Honor the lock order that transport lock > channel lock. Resolves #2246 --- core/src/main/java/io/grpc/internal/TransportSet.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/io/grpc/internal/TransportSet.java b/core/src/main/java/io/grpc/internal/TransportSet.java index 025b83276fc..d89a8ac49c3 100644 --- a/core/src/main/java/io/grpc/internal/TransportSet.java +++ b/core/src/main/java/io/grpc/internal/TransportSet.java @@ -249,9 +249,14 @@ public void run() { delayedTransport.endBackoff(); boolean shutdownDelayedTransport = false; Runnable runnable = null; + // TransportSet as a channel layer class should not call into transport methods while + // holding the lock, thus we call hasPendingStreams() outside of the lock. It will cause + // a _benign_ race where the TransportSet may transition to CONNECTING when there is not + // pending stream. + boolean hasPendingStreams = delayedTransport.hasPendingStreams(); synchronized (lock) { reconnectTask = null; - if (delayedTransport.hasPendingStreams()) { + if (hasPendingStreams) { // Transition directly to CONNECTING runnable = startNewTransport(delayedTransport); } else {