Skip to content

Commit

Permalink
fix: keep track of internal seek (#87)
Browse files Browse the repository at this point in the history
* fix: keep track of internal seek

* fix: check shutdown on seek

* fix: reset tokens on seek
  • Loading branch information
hannahrogers-google authored May 27, 2020
1 parent 74f0a6e commit a7e09ff
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.cloud.pubsublite.proto.SubscriberServiceGrpc.SubscriberServiceStub;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Monitor;
import io.grpc.Status;
import io.grpc.StatusException;
import java.util.Optional;
Expand All @@ -56,11 +57,24 @@ public class SubscriberImpl extends ProxyService
private final TokenCounter tokenCounter = new TokenCounter();

@GuardedBy("monitor.monitor")
private Optional<SettableApiFuture<Offset>> inFlightSeek = Optional.empty();
private Optional<InFlightSeek> inFlightSeek = Optional.empty();

@GuardedBy("monitor.monitor")
private boolean internalSeekInFlight = false;

@GuardedBy("monitor.monitor")
private boolean shutdown = false;

private static class InFlightSeek {
final SeekRequest seekRequest;
final SettableApiFuture<Offset> seekFuture;

InFlightSeek(SeekRequest request, SettableApiFuture<Offset> future) {
seekRequest = request;
seekFuture = future;
}
}

@VisibleForTesting
SubscriberImpl(
SubscriberServiceStub stub,
Expand Down Expand Up @@ -91,7 +105,7 @@ public SubscriberImpl(
protected void handlePermanentError(StatusException error) {
try (CloseableMonitor.Hold h = monitor.enter()) {
shutdown = true;
inFlightSeek.ifPresent(inFlight -> inFlight.setException(error));
inFlightSeek.ifPresent(inFlight -> inFlight.seekFuture.setException(error));
inFlightSeek = Optional.empty();
onPermanentError(error);
}
Expand All @@ -106,7 +120,7 @@ protected void stop() {
shutdown = true;
inFlightSeek.ifPresent(
inFlight ->
inFlight.setException(
inFlight.seekFuture.setException(
Status.ABORTED
.withDescription("Client stopped while seek in flight.")
.asException()));
Expand All @@ -115,13 +129,21 @@ protected void stop() {

@Override
public ApiFuture<Offset> seek(SeekRequest request) {
try (CloseableMonitor.Hold h = monitor.enter()) {
try (CloseableMonitor.Hold h =
monitor.enterWhenUninterruptibly(
new Monitor.Guard(monitor.monitor) {
@Override
public boolean isSatisfied() {
return !internalSeekInFlight || shutdown;
}
})) {
checkArgument(
Predicates.isValidSeekRequest(request), "Sent SeekRequest with no location set.");
checkState(!shutdown, "Seeked after the stream shut down.");
checkState(!inFlightSeek.isPresent(), "Seeked while seek is already in flight.");
SettableApiFuture<Offset> future = SettableApiFuture.create();
inFlightSeek = Optional.of(future);
inFlightSeek = Optional.of(new InFlightSeek(request, future));
tokenCounter.onClientSeek();
connection.modifyConnection(
connectedSubscriber ->
connectedSubscriber.ifPresent(subscriber -> subscriber.seek(request)));
Expand Down Expand Up @@ -164,13 +186,17 @@ public void triggerReinitialize() {
connectedSubscriber -> {
checkArgument(monitor.monitor.isOccupiedByCurrentThread());
checkArgument(connectedSubscriber.isPresent());
nextOffsetTracker
.requestForRestart()
.ifPresent(
request -> {
inFlightSeek = Optional.of(SettableApiFuture.create());
connectedSubscriber.get().seek(request);
});
if (inFlightSeek.isPresent()) {
connectedSubscriber.get().seek(inFlightSeek.get().seekRequest);
} else {
nextOffsetTracker
.requestForRestart()
.ifPresent(
request -> {
internalSeekInFlight = true;
connectedSubscriber.get().seek(request);
});
}
tokenCounter
.requestForRestart()
.ifPresent(request -> connectedSubscriber.get().allowFlow(request));
Expand Down Expand Up @@ -212,9 +238,13 @@ private Status onSeekResponse(Offset seekOffset) {
if (shutdown) {
return Status.OK;
}
if (internalSeekInFlight) {
internalSeekInFlight = false;
return Status.OK;
}
checkState(inFlightSeek.isPresent(), "No in flight seek, but received a seek response.");
nextOffsetTracker.onClientSeek(seekOffset);
inFlightSeek.get().set(seekOffset);
inFlightSeek.get().seekFuture.set(seekOffset);
inFlightSeek = Optional.empty();
return Status.OK;
} catch (StatusException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ void onMessages(Collection<SequencedMessage> received) throws StatusException {
messages -= received.size();
}

void onClientSeek() {
bytes = 0;
messages = 0;
}

Optional<FlowControlRequest> requestForRestart() {
if (bytes == 0 && messages == 0) return Optional.empty();
return Optional.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.cloud.pubsublite.internal.StatusExceptionMatcher.assertFutureThrowsCode;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
Expand All @@ -40,6 +41,7 @@
import com.google.cloud.pubsublite.SubscriptionPaths;
import com.google.cloud.pubsublite.internal.StatusExceptionMatcher;
import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriber.Response;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.InitialSubscribeRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
Expand Down Expand Up @@ -96,7 +98,7 @@ private static SubscribeRequest initialRequest() {

private final Listener permanentErrorHandler = mock(Listener.class);

private Subscriber subscriber;
private SubscriberImpl subscriber;
private StreamObserver<Response> leakedResponseObserver;

@Before
Expand Down Expand Up @@ -222,4 +224,37 @@ public void messageResponseSubtracts() {
verify(permanentErrorHandler)
.failed(any(), argThat(new StatusExceptionMatcher(Code.FAILED_PRECONDITION)));
}

@Test
public void reinitialize_resendsInFlightSeek() {
Offset offset = Offset.of(1);
SeekRequest seekRequest =
SeekRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(offset.value())).build();
ApiFuture<Offset> future = subscriber.seek(seekRequest);
assertThat(subscriber.seekInFlight()).isTrue();

subscriber.triggerReinitialize();
verify(mockConnectedSubscriber, times(2)).seek(seekRequest);

leakedResponseObserver.onNext(Response.ofSeekOffset(offset));
assertTrue(future.isDone());
assertThat(subscriber.seekInFlight()).isFalse();
}

@Test
public void reinitialize_sendsNextOffsetSeek() {
subscriber.allowFlow(bigFlowControlRequest());
ImmutableList<SequencedMessage> messages =
ImmutableList.of(
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(0), 10),
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(1), 10));
leakedResponseObserver.onNext(Response.ofMessages(messages));
verify(mockMessageConsumer).accept(messages);

subscriber.triggerReinitialize();
verify(mockConnectedSubscriber)
.seek(SeekRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(2)).build());
assertThat(subscriber.seekInFlight()).isFalse();
leakedResponseObserver.onNext(Response.ofSeekOffset(Offset.of(2)));
}
}

0 comments on commit a7e09ff

Please sign in to comment.