From a7e09ff5f0423f672a829fac4ddeeeef7feff57c Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Wed, 27 May 2020 10:31:10 -0700 Subject: [PATCH] fix: keep track of internal seek (#87) * fix: keep track of internal seek * fix: check shutdown on seek * fix: reset tokens on seek --- .../internal/wire/SubscriberImpl.java | 56 ++++++++++++++----- .../internal/wire/TokenCounter.java | 5 ++ .../internal/wire/SubscriberImplTest.java | 37 +++++++++++- 3 files changed, 84 insertions(+), 14 deletions(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java index abcd59014..290266f35 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java @@ -34,6 +34,7 @@ import com.google.cloud.pubsublite.proto.SubscriberServiceGrpc.SubscriberServiceStub; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Monitor; import io.grpc.Status; import io.grpc.StatusException; import java.util.Optional; @@ -56,11 +57,24 @@ public class SubscriberImpl extends ProxyService private final TokenCounter tokenCounter = new TokenCounter(); @GuardedBy("monitor.monitor") - private Optional> inFlightSeek = Optional.empty(); + private Optional inFlightSeek = Optional.empty(); + + @GuardedBy("monitor.monitor") + private boolean internalSeekInFlight = false; @GuardedBy("monitor.monitor") private boolean shutdown = false; + private static class InFlightSeek { + final SeekRequest seekRequest; + final SettableApiFuture seekFuture; + + InFlightSeek(SeekRequest request, SettableApiFuture future) { + seekRequest = request; + seekFuture = future; + } + } + @VisibleForTesting SubscriberImpl( SubscriberServiceStub stub, @@ -91,7 +105,7 @@ public SubscriberImpl( protected void handlePermanentError(StatusException error) { try (CloseableMonitor.Hold h = monitor.enter()) { shutdown = true; - inFlightSeek.ifPresent(inFlight -> inFlight.setException(error)); + inFlightSeek.ifPresent(inFlight -> inFlight.seekFuture.setException(error)); inFlightSeek = Optional.empty(); onPermanentError(error); } @@ -106,7 +120,7 @@ protected void stop() { shutdown = true; inFlightSeek.ifPresent( inFlight -> - inFlight.setException( + inFlight.seekFuture.setException( Status.ABORTED .withDescription("Client stopped while seek in flight.") .asException())); @@ -115,13 +129,21 @@ protected void stop() { @Override public ApiFuture seek(SeekRequest request) { - try (CloseableMonitor.Hold h = monitor.enter()) { + try (CloseableMonitor.Hold h = + monitor.enterWhenUninterruptibly( + new Monitor.Guard(monitor.monitor) { + @Override + public boolean isSatisfied() { + return !internalSeekInFlight || shutdown; + } + })) { checkArgument( Predicates.isValidSeekRequest(request), "Sent SeekRequest with no location set."); checkState(!shutdown, "Seeked after the stream shut down."); checkState(!inFlightSeek.isPresent(), "Seeked while seek is already in flight."); SettableApiFuture future = SettableApiFuture.create(); - inFlightSeek = Optional.of(future); + inFlightSeek = Optional.of(new InFlightSeek(request, future)); + tokenCounter.onClientSeek(); connection.modifyConnection( connectedSubscriber -> connectedSubscriber.ifPresent(subscriber -> subscriber.seek(request))); @@ -164,13 +186,17 @@ public void triggerReinitialize() { connectedSubscriber -> { checkArgument(monitor.monitor.isOccupiedByCurrentThread()); checkArgument(connectedSubscriber.isPresent()); - nextOffsetTracker - .requestForRestart() - .ifPresent( - request -> { - inFlightSeek = Optional.of(SettableApiFuture.create()); - connectedSubscriber.get().seek(request); - }); + if (inFlightSeek.isPresent()) { + connectedSubscriber.get().seek(inFlightSeek.get().seekRequest); + } else { + nextOffsetTracker + .requestForRestart() + .ifPresent( + request -> { + internalSeekInFlight = true; + connectedSubscriber.get().seek(request); + }); + } tokenCounter .requestForRestart() .ifPresent(request -> connectedSubscriber.get().allowFlow(request)); @@ -212,9 +238,13 @@ private Status onSeekResponse(Offset seekOffset) { if (shutdown) { return Status.OK; } + if (internalSeekInFlight) { + internalSeekInFlight = false; + return Status.OK; + } checkState(inFlightSeek.isPresent(), "No in flight seek, but received a seek response."); nextOffsetTracker.onClientSeek(seekOffset); - inFlightSeek.get().set(seekOffset); + inFlightSeek.get().seekFuture.set(seekOffset); inFlightSeek = Optional.empty(); return Status.OK; } catch (StatusException e) { diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/TokenCounter.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/TokenCounter.java index 51242ca2a..8ee3c44c9 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/TokenCounter.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/TokenCounter.java @@ -50,6 +50,11 @@ void onMessages(Collection received) throws StatusException { messages -= received.size(); } + void onClientSeek() { + bytes = 0; + messages = 0; + } + Optional requestForRestart() { if (bytes == 0 && messages == 0) return Optional.empty(); return Optional.of( diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SubscriberImplTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SubscriberImplTest.java index 8f1c9b7e8..6e08297fb 100755 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SubscriberImplTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SubscriberImplTest.java @@ -19,6 +19,7 @@ import static com.google.cloud.pubsublite.internal.StatusExceptionMatcher.assertFutureThrowsCode; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; @@ -40,6 +41,7 @@ import com.google.cloud.pubsublite.SubscriptionPaths; import com.google.cloud.pubsublite.internal.StatusExceptionMatcher; import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriber.Response; +import com.google.cloud.pubsublite.proto.Cursor; import com.google.cloud.pubsublite.proto.FlowControlRequest; import com.google.cloud.pubsublite.proto.InitialSubscribeRequest; import com.google.cloud.pubsublite.proto.SeekRequest; @@ -96,7 +98,7 @@ private static SubscribeRequest initialRequest() { private final Listener permanentErrorHandler = mock(Listener.class); - private Subscriber subscriber; + private SubscriberImpl subscriber; private StreamObserver leakedResponseObserver; @Before @@ -222,4 +224,37 @@ public void messageResponseSubtracts() { verify(permanentErrorHandler) .failed(any(), argThat(new StatusExceptionMatcher(Code.FAILED_PRECONDITION))); } + + @Test + public void reinitialize_resendsInFlightSeek() { + Offset offset = Offset.of(1); + SeekRequest seekRequest = + SeekRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(offset.value())).build(); + ApiFuture future = subscriber.seek(seekRequest); + assertThat(subscriber.seekInFlight()).isTrue(); + + subscriber.triggerReinitialize(); + verify(mockConnectedSubscriber, times(2)).seek(seekRequest); + + leakedResponseObserver.onNext(Response.ofSeekOffset(offset)); + assertTrue(future.isDone()); + assertThat(subscriber.seekInFlight()).isFalse(); + } + + @Test + public void reinitialize_sendsNextOffsetSeek() { + subscriber.allowFlow(bigFlowControlRequest()); + ImmutableList messages = + ImmutableList.of( + SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(0), 10), + SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(1), 10)); + leakedResponseObserver.onNext(Response.ofMessages(messages)); + verify(mockMessageConsumer).accept(messages); + + subscriber.triggerReinitialize(); + verify(mockConnectedSubscriber) + .seek(SeekRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(2)).build()); + assertThat(subscriber.seekInFlight()).isFalse(); + leakedResponseObserver.onNext(Response.ofSeekOffset(Offset.of(2))); + } }