Skip to content

Commit

Permalink
Calls StreamObserver.onStop only on termination.
Browse files Browse the repository at this point in the history
This calls onStop once, when the processor is being stopped. It also
documents onCompleted as being called after a processor has finished
due to being configure ot fetch only a subset. Previously onStop
was being called multiple times, for example on every retry.

For #289.
  • Loading branch information
dehora committed Oct 9, 2017
1 parent f4bf38a commit 51e03e1
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
7 changes: 6 additions & 1 deletion nakadi-java-client/src/main/java/nakadi/StreamObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@ default void onBegin() {
void onStart();

/**
* Called after the stream either cannot be recovered and retried, or has completed.
* Called after the stream either cannot be recovered and retried, or has been stopped.
*/
void onStop();

/**
* Notifies the Observer that the {@link StreamProcessorManaged} has finished sending batches.
*
* <p>
* This signals the end of a stream. which happens when a processor was configured to handle a
* fixed number of items. Typically Nakadi streams are considered infinite.
*</p>
*
* <p>Once the {@link StreamProcessorManaged} calls this method, it will not call
* {@link #onNext}. </p>
*/
Expand Down
9 changes: 6 additions & 3 deletions nakadi-java-client/src/main/java/nakadi/StreamProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public class StreamProcessor implements StreamProcessorManaged {
(t, e) -> handleUncaught(t, e, "stream_processor_err_compute"))
.build());
private final Scheduler monoComputeScheduler = Schedulers.from(monoComputeExecutor);

private Flowable<StreamBatchRecord<?>> _observable;
private StreamObserver streamObserver;

@VisibleForTesting
@SuppressWarnings("unused") StreamProcessor(NakadiClient client,
Expand Down Expand Up @@ -222,6 +223,8 @@ private void stopStreaming() {
this.failedProcessorException.getMessage());
}

streamObserver.onStop();

subscriber.dispose();
logger.debug("op=stream_processor_stop msg=stopping_executor name=monoIoScheduler");
ExecutorServiceSupport.shutdown(monoIoExecutor);
Expand All @@ -233,6 +236,7 @@ private void stopStreaming() {
private <T> void stream(StreamConfiguration sc, StreamObserverProvider<T> provider) {

final StreamObserver<T> observer = provider.createStreamObserver();
streamObserver = observer;
final TypeLiteral<T> literal = provider.typeLiteral();
final Flowable<StreamBatchRecord<T>> observable = this.buildObservable(observer, sc, literal);

Expand Down Expand Up @@ -290,6 +294,7 @@ private <T> Flowable<StreamBatchRecord<T>> buildObservable(
.doOnSubscribe(subscription -> streamObserver.onStart())
.doOnComplete(streamObserver::onCompleted)
.doOnCancel(() -> {
System.out.println("wwwwwwwwwwwwwwwwwww");
if (retryAttemptsFinished.get()) {

logger.info("op=stop_processor msg=stopping_after_stream_retry_finished");
Expand All @@ -299,8 +304,6 @@ private <T> Flowable<StreamBatchRecord<T>> buildObservable(
// land in setupRxErrorHandler's RxJavaPlugins.setErrorHandler.
stopStreaming();

} else {
streamObserver.onStop();
}
})
.timeout(halfOpenKick, halfOpenUnit)
Expand Down

0 comments on commit 51e03e1

Please sign in to comment.