Skip to content

Commit

Permalink
fix: prevent access of closed resources from async scheduled tasks
Browse files Browse the repository at this point in the history
The `asyncActor` used for executing scheduled tasks must be closed
before stopping the stream processor to prevent scheduled tasks from
accessing resources, especially the ZeebeDb, that are closed after the
stream processor stops.

This is done in a somewhat convoluted way because the stream processor
is not allowed to block on the closing of the asyncActor.
Instead, the asyncActor receives the `closeFuture` of the stream
processor and completes this when it is done closing.
Unfortunately, the asyncActor is not guaranteed to run because we only
start it in processing mode. That means that closing the `asyncActor`
needs to check if the actor was never scheduled, or equivalently, is
already closed. In that case it immediately completes the `closeFuture`
of the stream processor.

Luckily, scheduling an actor is atomic in the sense that the volatile
lifecycle phase is set to `queued` and the task is thus no longer in
phase closed. Because both scheduling and requesting the close happens
within the stream processor as actor jobs, there appears to be no
race condition that could leave a zombie `asyncActor`.

(cherry picked from commit 9f900e3)
  • Loading branch information
lenaschoenburg committed Mar 7, 2023
1 parent 55952bd commit e5b0339
Showing 1 changed file with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,11 @@ protected void onActorStarted() {
@Override
protected void onActorClosing() {
tearDown();
asyncActor.close(closeFuture);
}

@Override
protected void onActorClosed() {
closeFuture.complete(null);
LOG.debug("Closed stream processor controller {}.", getName());
}

Expand Down Expand Up @@ -267,7 +267,7 @@ public void onActorFailed() {
isOpened.set(false);
lifecycleAwareListeners.forEach(StreamProcessorLifecycleAware::onFailed);
tearDown();
closeFuture.complete(null);
asyncActor.close(closeFuture);
}

private boolean shouldProcessNext() {
Expand All @@ -276,7 +276,6 @@ private boolean shouldProcessNext() {

private void tearDown() {
processorActorService.close();
asyncActor.closeAsync();
asyncScheduleService.close();
streamProcessorContext.getLogStreamReader().close();
logStream.removeRecordAvailableListener(this);
Expand Down Expand Up @@ -571,6 +570,7 @@ public void onRecordAvailable() {
private static final class AsyncProcessingScheduleServiceActor extends Actor {

private final ProcessingScheduleServiceImpl scheduleService;
private CompletableActorFuture<Void> closeFuture;

public AsyncProcessingScheduleServiceActor(
final ProcessingScheduleServiceImpl scheduleService) {
Expand All @@ -589,9 +589,28 @@ protected void onActorStarting() {
});
}

@Override
protected void onActorClosed() {
closeFuture.complete(null);
}

@Override
public void onActorFailed() {
closeFuture.complete(null);
}

public ActorControl getActorControl() {
return actor;
}

public void close(final CompletableActorFuture<Void> closeFuture) {
this.closeFuture = closeFuture;
if (actor.isClosed()) {
closeFuture.complete(null);
} else {
actor.close();
}
}
}

private interface Step {
Expand Down

0 comments on commit e5b0339

Please sign in to comment.