From 8832d37749120362b473ff84d7911ceed571e0b4 Mon Sep 17 00:00:00 2001 From: Larry Safran Date: Wed, 31 Jan 2024 13:15:35 -0800 Subject: [PATCH 1/4] Fix retries that timeout hanging forever. (#10855) Fixes #10336 --- core/src/main/java/io/grpc/internal/RetriableStream.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 1cb2a668a45..56c9c9d68d5 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -195,7 +195,10 @@ public void run() { } } if (retryFuture != null) { - retryFuture.cancel(false); + boolean cancelled = retryFuture.cancel(false); + if (cancelled) { + inFlightSubStreams.decrementAndGet(); + } } if (hedgingFuture != null) { hedgingFuture.cancel(false); From 9de490fa227bd162574dfb3aa440ad3912c6db54 Mon Sep 17 00:00:00 2001 From: Larry Safran Date: Mon, 5 Feb 2024 10:54:55 -0800 Subject: [PATCH 2/4] Fix flaky retry tests (#10887) * Reorder tracing and actually closing listener to eliminate test flakiness * Use real value rather than mock for flaky test --- .../grpc/internal/AbstractClientStream.java | 2 +- .../grpc/testing/integration/RetryTest.java | 44 +++++++++++++++---- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java index 4ef743bf96d..a4ebfa52d63 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java @@ -455,10 +455,10 @@ private void closeListener( if (!listenerClosed) { listenerClosed = true; statsTraceCtx.streamClosed(status); - listener().closed(status, rpcProgress, trailers); if (getTransportTracer() != null) { getTransportTracer().reportStreamClosed(status.isOk()); } + listener().closed(status, rpcProgress, trailers); } } } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java index 72ed8bf975b..b5a6c199325 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java @@ -18,7 +18,10 @@ import static com.google.common.truth.Truth.assertThat; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.Assert.assertNotNull; +import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; @@ -78,8 +81,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.ArgumentCaptor; -import org.mockito.Mock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; @@ -103,8 +104,11 @@ public class RetryTest { @Rule public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); private final FakeClock fakeClock = new FakeClock(); - @Mock - private ClientCall.Listener mockCallListener; + private TestListener testCallListener = new TestListener(); + @SuppressWarnings("unchecked") + private ClientCall.Listener mockCallListener = + mock(ClientCall.Listener.class, delegatesTo(testCallListener)); + private CountDownLatch backoffLatch = new CountDownLatch(1); private final EventLoopGroup group = new DefaultEventLoopGroup() { @SuppressWarnings("FutureReturnValueIgnored") @@ -244,8 +248,10 @@ private void assertInboundWireSizeRecorded(long length) throws Exception { private void assertRpcStatusRecorded( Status.Code code, long roundtripLatencyMs, long outboundMessages) throws Exception { - MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); + MetricsRecord record = clientStatsRecorder.pollRecord(7, SECONDS); + assertNotNull(record); TagValue statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS); + assertNotNull(statusTag); assertThat(statusTag.asString()).isEqualTo(code.toString()); assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT)) .isEqualTo(1); @@ -295,14 +301,14 @@ public void retryUntilBufferLimitExceeded() throws Exception { verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class)); // send one more message, should exceed buffer limit call.sendMessage(message); + // let attempt fail + testCallListener.clear(); serverCall.close( Status.UNAVAILABLE.withDescription("2nd attempt failed"), new Metadata()); // no more retry - ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); - verify(mockCallListener, timeout(5000)).onClose(statusCaptor.capture(), any(Metadata.class)); - assertThat(statusCaptor.getValue().getDescription()).contains("2nd attempt failed"); + testCallListener.verifyDescription("2nd attempt failed", 5000); } @Test @@ -534,4 +540,26 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata header assertRpcStatusRecorded(Code.INVALID_ARGUMENT, 0, 0); assertRetryStatsRecorded(0, 1, 0); } + + private static class TestListener extends ClientCall.Listener { + Status status = null; + private CountDownLatch closeLatch = new CountDownLatch(1); + + @Override + public void onClose(Status status, Metadata trailers) { + this.status = status; + closeLatch.countDown(); + } + + void clear() { + status = null; + closeLatch = new CountDownLatch(1); + } + + void verifyDescription(String description, long timeoutMs) throws InterruptedException { + closeLatch.await(timeoutMs, TimeUnit.MILLISECONDS); + assertNotNull(status); + assertThat(status.getDescription()).contains(description); + } + } } From 88a9bbbb66945be9ce7d08b47f94915fdfeadb7b Mon Sep 17 00:00:00 2001 From: Larry Safran Date: Mon, 5 Feb 2024 16:53:22 -0800 Subject: [PATCH 3/4] Use future from real scheduling of the command to make sure that desired command is run. --- .../test/java/io/grpc/testing/integration/RetryTest.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java index b5a6c199325..a2e1a186794 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java @@ -108,6 +108,7 @@ public class RetryTest { @SuppressWarnings("unchecked") private ClientCall.Listener mockCallListener = mock(ClientCall.Listener.class, delegatesTo(testCallListener)); + private java.util.concurrent.ScheduledFuture activeFuture = null; private CountDownLatch backoffLatch = new CountDownLatch(1); private final EventLoopGroup group = new DefaultEventLoopGroup() { @@ -118,7 +119,7 @@ public ScheduledFuture schedule( if (!command.getClass().getName().contains("RetryBackoffRunnable")) { return super.schedule(command, delay, unit); } - fakeClock.getScheduledExecutorService().schedule( + activeFuture = fakeClock.getScheduledExecutorService().schedule( new Runnable() { @Override public void run() { @@ -307,6 +308,8 @@ public void retryUntilBufferLimitExceeded() throws Exception { serverCall.close( Status.UNAVAILABLE.withDescription("2nd attempt failed"), new Metadata()); + fakeClock.forwardTime(1, SECONDS); + activeFuture.get(1, SECONDS); // Make sure the close is done. // no more retry testCallListener.verifyDescription("2nd attempt failed", 5000); } @@ -420,6 +423,9 @@ public void streamClosed(Status status) { call.cancel("Cancelled before commit", null); // Let the netty substream listener be closed. streamClosedLatch.countDown(); + assertNotNull("No activeFuture", activeFuture); + fakeClock.forwardTime(1, SECONDS); + activeFuture.get(1, SECONDS); // The call listener is closed. verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class)); assertRpcStatusRecorded(Code.CANCELLED, 17_000, 1); From 45787b098ed31c6d84a7548bebb4956a06576b6c Mon Sep 17 00:00:00 2001 From: Larry Safran Date: Mon, 5 Feb 2024 17:58:06 -0800 Subject: [PATCH 4/4] Fix test --- .../src/test/java/io/grpc/testing/integration/RetryTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java index a2e1a186794..7a5bba7add1 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java @@ -428,7 +428,7 @@ public void streamClosed(Status status) { activeFuture.get(1, SECONDS); // The call listener is closed. verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class)); - assertRpcStatusRecorded(Code.CANCELLED, 17_000, 1); + assertRpcStatusRecorded(Code.CANCELLED, 18_000, 1); assertRetryStatsRecorded(1, 0, 0); }