Skip to content

Commit

Permalink
[Improve] Improve CheckpointCoordinator notify complete when restore
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X committed Jul 22, 2023
1 parent 5e8d982 commit 5e68b5a
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public enum CheckpointCloseReason {
CHECKPOINT_COORDINATOR_RESET("CheckpointCoordinator reset."),
CHECKPOINT_INSIDE_ERROR("CheckpointCoordinator inside have error."),
AGGREGATE_COMMIT_ERROR("Aggregate commit error."),
TASK_NOT_ALL_READY_WHEN_SAVEPOINT("Task not all ready, savepoint error");
TASK_NOT_ALL_READY_WHEN_SAVEPOINT("Task not all ready, savepoint error"),
CHECKPOINT_NOTIFY_COMPLETE_FAILED("Checkpoint notify complete failed");

private final String message;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ private void handleCoordinatorError(CheckpointCloseReason reason, Throwable e) {
checkpointCoordinatorFuture.complete(
new CheckpointCoordinatorState(
CheckpointCoordinatorStatus.FAILED, errorByPhysicalVertex.get()));
checkpointManager.handleCheckpointError(pipelineId);
checkpointManager.handleCheckpointError(
pipelineId, reason.equals(CheckpointCloseReason.CHECKPOINT_NOTIFY_COMPLETE_FAILED));
}

private void restoreTaskState(TaskLocation taskLocation) {
Expand Down Expand Up @@ -316,9 +317,26 @@ private void allTaskReady() {
isAllTaskReady = true;
InvocationFuture<?>[] futures = notifyTaskStart();
CompletableFuture.allOf(futures).join();
notifyCompleted(latestCompletedCheckpoint);
scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
}

private void notifyCompleted(CompletedCheckpoint completedCheckpoint) {
if (completedCheckpoint != null) {
try {
LOG.info("start notify checkpoint completed, checkpoint:{}", completedCheckpoint);
InvocationFuture<?>[] invocationFutures =
notifyCheckpointCompleted(completedCheckpoint.getCheckpointId());
CompletableFuture.allOf(invocationFutures).join();
} catch (Throwable e) {
handleCoordinatorError(
"notify checkpoint completed failed",
e,
CheckpointCloseReason.CHECKPOINT_NOTIFY_COMPLETE_FAILED);
}
}
}

public InvocationFuture<?>[] notifyTaskStart() {
return plan.getPipelineSubtasks().stream()
.map(NotifyTaskStartOperation::new)
Expand Down Expand Up @@ -358,6 +376,7 @@ protected void restoreCoordinator(boolean alreadyStarted) {
shutdown = false;
if (alreadyStarted) {
isAllTaskReady = true;
notifyCompleted(latestCompletedCheckpoint);
tryTriggerPendingCheckpoint(CHECKPOINT_TYPE);
} else {
isAllTaskReady = false;
Expand Down Expand Up @@ -719,10 +738,9 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed
completedCheckpoint.getCheckpointId(),
completedCheckpoint.getPipelineId(),
completedCheckpoint.getJobId());
InvocationFuture<?>[] invocationFutures = notifyCheckpointCompleted(checkpointId);
CompletableFuture.allOf(invocationFutures).join();
// TODO: notifyCheckpointCompleted fail

latestCompletedCheckpoint = completedCheckpoint;
notifyCompleted(completedCheckpoint);
if (isCompleted()) {
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED);
if (latestCompletedCheckpoint.getCheckpointType().equals(SAVEPOINT_TYPE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ public void reportedPipelineRunning(int pipelineId, boolean alreadyStarted) {
getCheckpointCoordinator(pipelineId).restoreCoordinator(alreadyStarted);
}

protected void handleCheckpointError(int pipelineId) {
jobMaster.handleCheckpointError(pipelineId);
protected void handleCheckpointError(int pipelineId, boolean neverRestore) {
jobMaster.handleCheckpointError(pipelineId, neverRestore);
}

private CheckpointCoordinator getCheckpointCoordinator(TaskLocation taskLocation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,10 @@ public void run() {
}
}

public void handleCheckpointError(long pipelineId) {
public void handleCheckpointError(long pipelineId, boolean neverRestore) {
if (neverRestore) {
this.neverNeedRestore();
}
this.physicalPlan
.getPipelineList()
.forEach(
Expand Down

0 comments on commit 5e68b5a

Please sign in to comment.