Skip to content

Commit

Permalink
Merge #5891
Browse files Browse the repository at this point in the history
5891: [BACKPORT 0.25] handle failures during transitions r=deepthidevaki a=MiguelPires

## Description

We've seen nodes get stuck in invalid states because of errors during role transitions. This PR makes improves handling of these failures by:

ensuring that all steps are closed even if one of them fails (nothing is gained by leaving some pieces of state open)
partition listeners are notified if a transition fails or if the partition transitions to inactive (we want to close things like the commandApiService even if the follower wasn't successfully installed)
if an error occurs during the closing part of a transition, we still try to install the new partition (some closing failures are expected in which case we shouldn't prevent the install)
if an error occurs while trying to install the leader partition, we step down to follower
if an error occurs while trying to install the follower partition, we become inactive
## Related issues

<!-- Which issues are closed by this PR or are related -->
related #5117
related #5291

backports #5813

## Definition of Done

_Not all items need to be done depending on the issue and the pull request._

Code changes:
* [ ] The changes are backwards compatibility with previous versions
* [ ] If it fixes a bug then PRs are created to [backport](https://github.com/zeebe-io/zeebe/compare/stable/0.24...develop?expand=1&template=backport_template.md&title=[Backport%200.24]) the fix to the last two minor versions. You can trigger a backport by assigning labels (e.g. `backport stable/0.25`) to the PR, in case that fails you need to create backports manually.

Testing:
* [ ] There are unit/integration tests that verify all acceptance criterias of the issue
* [ ] New tests are written to ensure backwards compatibility with further versions
* [ ] The behavior is tested manually
* [ ] The impact of the changes is verified by a benchmark 

Documentation: 
* [ ] The documentation is updated (e.g. BPMN reference, configuration, examples, get-started guides, etc.)
* [ ] New content is added to the [release announcement](https://drive.google.com/drive/u/0/folders/1DTIeswnEEq-NggJ25rm2BsDjcCQpDape)


Co-authored-by: Miguel Pires <miguel.pires@camunda.com>
  • Loading branch information
zeebe-bors[bot] and Miguel Pires authored Nov 23, 2020
2 parents 0fc339a + 4d6add5 commit d19415e
Show file tree
Hide file tree
Showing 27 changed files with 388 additions and 241 deletions.
7 changes: 7 additions & 0 deletions atomix/cluster/src/main/java/io/atomix/raft/RaftServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,13 @@ default CompletableFuture<RaftServer> listen(final MemberId... cluster) {
*/
CompletableFuture<Void> shutdown();

/**
* Transitions the server to INACTIVE without shutting down or leaving the cluster.
*
* @return A completable future to be completed once the server is inactive.
*/
CompletableFuture<Void> goInactive();

/**
* Leaves the Raft cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,18 @@ public CompletableFuture<Void> shutdown() {
return future;
}

@Override
public CompletableFuture<Void> goInactive() {
final CompletableFuture<Void> future = new AtomixFuture<>();
context
.getThreadContext()
.execute(
() -> {
context.transition(Role.INACTIVE);
future.complete(null);
});
return future;
}
/**
* Leaves the Raft cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ public CompletableFuture<Void> stepDown() {
return server.stepDown();
}

public CompletableFuture<Void> goInactive() {
return server.goInactive();
}

private void onFailure() {
CompletableFuture.allOf(
raftFailureListeners.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ public CompletableFuture<Void> leave() {
return server.leave();
}

public CompletableFuture<Void> goInactive() {
return server.goInactive();
}

/**
* Takes a snapshot of the partition server.
*
Expand Down
10 changes: 10 additions & 0 deletions broker/src/main/java/io/zeebe/broker/PartitionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,14 @@ public interface PartitionListener {
* @return future that should be completed by the listener
*/
ActorFuture<Void> onBecomingLeader(int partitionId, long term, LogStream logStream);

/**
* Is called by the {@link io.zeebe.broker.system.partitions.ZeebePartition} on becoming inactive
* after a Raft role change or a failed transition.
*
* @param partitionId the corresponding partition id
* @param term the current term
* @return future that should be completed by the listener
*/
ActorFuture<Void> onBecomingInactive(int partitionId, long term);
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public ActorFuture<Void> onBecomingLeader(
return setLeader(term, partitionId);
}

@Override
public ActorFuture<Void> onBecomingInactive(final int partitionId, final long term) {
return setInactive(partitionId);
}

@Override
public String getName() {
return actorName;
Expand Down Expand Up @@ -104,6 +109,15 @@ public ActorFuture<Void> setFollower(final int partitionId) {
});
}

public ActorFuture<Void> setInactive(final int partitionId) {
return actor.call(
() -> {
removeIfLeader(localBroker, partitionId);
localBroker.setInactiveForPartition(partitionId);
publishTopologyChanges();
});
}

@Override
public void event(final ClusterMembershipEvent clusterMembershipEvent) {
final Member eventSource = clusterMembershipEvent.subject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ public ActorFuture<Void> onBecomingLeader(
return future;
}

@Override
public ActorFuture<Void> onBecomingInactive(final int partitionId, final long term) {
return actor.call(
() -> {
leaderPartitions.remove(partitionId);
return null;
});
}

@Override
public void onDiskSpaceNotAvailable() {
actor.call(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ public ActorFuture<Void> onBecomingLeader(
return future;
}

@Override
public ActorFuture<Void> onBecomingInactive(final int partitionId, final long term) {
return actor.call(
() -> {
leaderForPartitions.remove(partitionId);
return null;
});
}

@Override
public String getName() {
return actorName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.zeebe.util.health.HealthStatus;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -131,6 +132,11 @@ public ActorFuture<Void> onBecomingLeader(
return updateBrokerReadyStatus(partitionId);
}

@Override
public ActorFuture<Void> onBecomingInactive(final int partitionId, final long term) {
return CompletableActorFuture.completed(null);
}

private ActorFuture<Void> updateBrokerReadyStatus(final int partitionId) {
return actor.call(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,13 @@ private ActorFuture<Void> leaderTransition(final long newTerm) {
t -> {
// Compare with the current term in case a new role transition happened
if (t != null && term == newTerm) {
onInstallFailure();
onInstallFailure(newTerm);
}
});
onRecoveredInternal();
} else {
LOG.error("Failed to install leader partition {}", context.getPartitionId(), error);
onInstallFailure();
onInstallFailure(newTerm);
}
});
return leaderTransitionFuture;
Expand All @@ -148,13 +148,13 @@ private ActorFuture<Void> followerTransition(final long newTerm) {
t -> {
// Compare with the current term in case a new role transition happened
if (t != null && term == newTerm) {
onInstallFailure();
onInstallFailure(newTerm);
}
});
onRecoveredInternal();
} else {
LOG.error("Failed to install follower partition {}", context.getPartitionId(), error);
onInstallFailure();
onInstallFailure(newTerm);
}
});
return followerTransitionFuture;
Expand Down Expand Up @@ -251,7 +251,7 @@ protected void handleFailure(final Exception failure) {
LOG.warn("Uncaught exception in {}.", actorName, failure);
// Most probably exception happened in the middle of installing leader or follower services
// because this actor is not doing anything else
onInstallFailure();
onInstallFailure(term);
}

@Override
Expand All @@ -272,11 +272,19 @@ public void onRecovered() {
});
}

private void onInstallFailure() {
private void onInstallFailure(final long term) {
zeebePartitionHealth.setServicesInstalled(false);
context
.getPartitionListeners()
.forEach(l -> l.onBecomingInactive(context.getPartitionId(), term));

if (context.getRaftPartition().getRole() == Role.LEADER) {
LOG.info("Unexpected failures occurred when installing leader services, stepping down");
context.getRaftPartition().stepDown();
} else {
LOG.info(
"Unexpected failures occurred when installing follower services, transitioning to inactive");
context.getRaftPartition().goInactive();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,7 @@ private ActorFuture<Void> enqueueTransition(final List<PartitionStep> partitionS

private void transition(
final CompletableActorFuture<Void> future, final List<PartitionStep> steps) {
closePartition()
.onComplete(
(nothing, err) -> {
if (err == null) {
installPartition(future, new ArrayList<>(steps));
} else {
future.completeExceptionally(err);
}
});
closePartition().onComplete((nothing, err) -> installPartition(future, new ArrayList<>(steps)));
}

private void installPartition(
Expand All @@ -99,6 +91,7 @@ private void installPartition(
(value, err) -> {
if (err != null) {
LOG.error("Expected to open step '{}' but failed with", step.getName(), err);
tryCloseStep(step);
future.completeExceptionally(err);
} else {
openedSteps.add(step);
Expand All @@ -107,23 +100,41 @@ private void installPartition(
});
}

private void tryCloseStep(final PartitionStep step) {
// close if there's anything to close. Don't add to 'opened' list, since the open did not
// complete, the close might also fail but that shouldn't prevent the next transition
try {
step.close(context);
} catch (final Exception e) {
LOG.debug("Couldn't close partition step '{}' that failed to open", step.getName(), e);
}
}

private CompletableActorFuture<Void> closePartition() {
final var closingSteps = new ArrayList<>(openedSteps);
Collections.reverse(closingSteps);
return closeSteps(closingSteps);
}

private CompletableActorFuture<Void> closeSteps(final List<PartitionStep> steps) {
final var closingPartitionFuture = new CompletableActorFuture<Void>();
stepByStepClosing(closingPartitionFuture, closingSteps);

stepByStepClosing(closingPartitionFuture, steps, null);
return closingPartitionFuture;
}

private void stepByStepClosing(
final CompletableActorFuture<Void> future, final List<PartitionStep> steps) {
final CompletableActorFuture<Void> future,
final List<PartitionStep> steps,
final Throwable throwable) {
if (steps.isEmpty()) {
LOG.debug(
"Partition {} closed all previous open resources, before transitioning.",
context.getPartitionId());
future.complete(null);
if (throwable == null) {
future.complete(null);
} else {
future.completeExceptionally(throwable);
}
return;
}

Expand All @@ -132,26 +143,22 @@ private void stepByStepClosing(

final ActorFuture<Void> closeFuture = step.close(context);
closeFuture.onComplete(
(v, t) -> {
if (t == null) {
(v, closingError) -> {
if (closingError == null) {
LOG.debug(
"Closing Zeebe-Partition-{}: {} closed successfully",
context.getPartitionId(),
step.getName());

// remove the completed step from the list in case that the closing is interrupted
openedSteps.remove(step);

// closing the remaining steps
stepByStepClosing(future, steps);
} else {
LOG.error(
"Closing Zeebe-Partition-{}: {} failed to close",
"Closing Zeebe-Partition-{}: {} failed to close. Closing remaining steps",
context.getPartitionId(),
step.getName(),
t);
future.completeExceptionally(t);
closingError);
}

openedSteps.remove(step);
stepByStepClosing(future, steps, throwable != null ? throwable : closingError);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package io.zeebe.broker.system.partitions.impl.steps;

import io.atomix.storage.journal.JournalReader.Mode;
import io.zeebe.broker.Loggers;
import io.zeebe.broker.system.partitions.PartitionContext;
import io.zeebe.broker.system.partitions.PartitionStep;
import io.zeebe.util.sched.future.ActorFuture;
Expand All @@ -24,8 +25,15 @@ public ActorFuture<Void> open(final PartitionContext context) {

@Override
public ActorFuture<Void> close(final PartitionContext context) {
context.getRaftLogReader().close();
context.setRaftLogReader(null);
try {
context.getRaftLogReader().close();
} catch (final Exception e) {
Loggers.SYSTEM_LOGGER.error(
"Unexpected error closing Raft log reader for partition {}", context.getPartitionId(), e);
} finally {
context.setRaftLogReader(null);
}

return CompletableActorFuture.completed(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,7 @@ protected void onActorClosing() {

@Override
public ActorFuture<Void> onBecomingFollower(final int partitionId, final long term) {
return actor.call(
() -> {
requestHandler.removePartition(partitionId);
cleanLeadingPartition(partitionId);
});
return removeLeaderHandlersAsync(partitionId);
}

@Override
Expand Down Expand Up @@ -97,6 +93,19 @@ public ActorFuture<Void> onBecomingLeader(
return future;
}

@Override
public ActorFuture<Void> onBecomingInactive(final int partitionId, final long term) {
return removeLeaderHandlersAsync(partitionId);
}

private ActorFuture<Void> removeLeaderHandlersAsync(final int partitionId) {
return actor.call(
() -> {
requestHandler.removePartition(partitionId);
cleanLeadingPartition(partitionId);
});
}

private void cleanLeadingPartition(final int partitionId) {
leadPartitions.remove(partitionId);
removeForPartitionId(partitionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ public ActorFuture<Void> onBecomingLeader(
leaderLatch.countDown();
return CompletableActorFuture.completed(null);
}

@Override
public ActorFuture<Void> onBecomingInactive(final int partitionId, final long term) {
return CompletableActorFuture.completed(null);
}
});

// when
Expand Down
Loading

0 comments on commit d19415e

Please sign in to comment.