Skip to content

Commit

Permalink
refactor: iterate over closing
Browse files Browse the repository at this point in the history
Close AsyncActor before StreamProcessor actor, to ensure that first actor is closed before resources are set to null. Furthermore, to simplify closing lifecycle. AsyncActor gets own closing future, which is set to completed by default and will be initialized when starting.

When StreamProcessor closeAsync is called StreamProcessor submits new job in order to schedule closing procedure.

When Failure occures the AsyncActor will also be closed first, before all other resources and actor.

(cherry picked from commit e16065b)
  • Loading branch information
ChrisKujawa authored and lenaschoenburg committed Mar 7, 2023
1 parent e5b0339 commit 9c16218
Showing 1 changed file with 31 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ protected void onActorStarted() {
@Override
protected void onActorClosing() {
tearDown();
asyncActor.close(closeFuture);
closeFuture.complete(null);
}

@Override
Expand All @@ -252,7 +252,12 @@ protected void onActorCloseRequested() {
@Override
public ActorFuture<Void> closeAsync() {
isOpened.set(false);
actor.close();

actor.run(
() -> {
asyncActor.closeAsync().onComplete((v, t) -> actor.close());
});

return closeFuture;
}

Expand All @@ -267,7 +272,6 @@ public void onActorFailed() {
isOpened.set(false);
lifecycleAwareListeners.forEach(StreamProcessorLifecycleAware::onFailed);
tearDown();
asyncActor.close(closeFuture);
}

private boolean shouldProcessNext() {
Expand Down Expand Up @@ -417,18 +421,23 @@ private void onRecovered(final LastProcessingPositions lastProcessingPositions)

private void onFailure(final Throwable throwable) {
LOG.error("Actor {} failed in phase {}.", actorName, actor.getLifecyclePhase(), throwable);
actor.fail(throwable);
if (!openFuture.isDone()) {
openFuture.completeExceptionally(throwable);
}

if (throwable instanceof UnrecoverableException) {
final var report = HealthReport.dead(this).withIssue(throwable);
failureListeners.forEach(l -> l.onUnrecoverableFailure(report));
} else {
final var report = HealthReport.unhealthy(this).withIssue(throwable);
failureListeners.forEach(l -> l.onFailure(report));
}
final var asyncActorCloseFuture = asyncActor.closeAsync();
asyncActorCloseFuture.onComplete(
(v, t) -> {
actor.fail(throwable);
if (!openFuture.isDone()) {
openFuture.completeExceptionally(throwable);
}

if (throwable instanceof UnrecoverableException) {
final var report = HealthReport.dead(this).withIssue(throwable);
failureListeners.forEach(l -> l.onUnrecoverableFailure(report));
} else {
final var report = HealthReport.unhealthy(this).withIssue(throwable);
failureListeners.forEach(l -> l.onFailure(report));
}
});
}

public boolean isOpened() {
Expand Down Expand Up @@ -570,7 +579,7 @@ public void onRecordAvailable() {
private static final class AsyncProcessingScheduleServiceActor extends Actor {

private final ProcessingScheduleServiceImpl scheduleService;
private CompletableActorFuture<Void> closeFuture;
private CompletableActorFuture<Void> closeFuture = CompletableActorFuture.completed(null);

public AsyncProcessingScheduleServiceActor(
final ProcessingScheduleServiceImpl scheduleService) {
Expand All @@ -587,13 +596,20 @@ protected void onActorStarting() {
actor.fail(t);
}
});
closeFuture = new CompletableActorFuture<>();
}

@Override
protected void onActorClosed() {
closeFuture.complete(null);
}

@Override
public CompletableActorFuture<Void> closeAsync() {
actor.close();
return closeFuture;
}

@Override
public void onActorFailed() {
closeFuture.complete(null);
Expand All @@ -602,15 +618,6 @@ public void onActorFailed() {
public ActorControl getActorControl() {
return actor;
}

public void close(final CompletableActorFuture<Void> closeFuture) {
this.closeFuture = closeFuture;
if (actor.isClosed()) {
closeFuture.complete(null);
} else {
actor.close();
}
}
}

private interface Step {
Expand Down

0 comments on commit 9c16218

Please sign in to comment.