Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bugfix][zeta] Resolved the issue causing checkpoints to halt on tolerable-failure=0. #5263

Merged
merged 2 commits into from
Aug 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions config/seatunnel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ seatunnel:
checkpoint:
interval: 10000
timeout: 60000
max-concurrent: 1
tolerable-failure: 2
storage:
type: hdfs
max-retained: 3
Expand Down
8 changes: 0 additions & 8 deletions docs/en/seatunnel-engine/checkpoint-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ seatunnel:
checkpoint:
interval: 6000
timeout: 7000
max-concurrent: 1
tolerable-failure: 2
storage:
type: hdfs
max-retained: 3
Expand Down Expand Up @@ -94,8 +92,6 @@ seatunnel:
checkpoint:
interval: 6000
timeout: 7000
max-concurrent: 1
tolerable-failure: 2
storage:
type: hdfs
max-retained: 3
Expand All @@ -119,8 +115,6 @@ seatunnel:
checkpoint:
interval: 6000
timeout: 7000
max-concurrent: 1
tolerable-failure: 2
storage:
type: hdfs
max-retained: 3
Expand Down Expand Up @@ -160,8 +154,6 @@ seatunnel:
checkpoint:
interval: 6000
timeout: 7000
max-concurrent: 1
tolerable-failure: 2
storage:
type: hdfs
max-retained: 3
Expand Down
10 changes: 0 additions & 10 deletions docs/en/seatunnel-engine/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,6 @@ The interval between two checkpoints, unit is milliseconds. If the `checkpoint.i

The timeout of a checkpoint. If a checkpoint cannot be completed within the timeout period, a checkpoint failure will be triggered. Therefore, Job will be restored.

**max-concurrent**

How many checkpoints can be performed simultaneously at most.

**tolerable-failure**

Maximum number of retries after checkpoint failure.

Example

```
Expand All @@ -95,8 +87,6 @@ seatunnel:
checkpoint:
interval: 300000
timeout: 10000
max-concurrent: 1
tolerable-failure: 2
```

**checkpoint storage**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ seatunnel:
checkpoint:
interval: 300000
timeout: 10000
max-concurrent: 1
tolerable-failure: 2
storage:
type: localfile
max-retained: 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ seatunnel:
checkpoint:
interval: 6000
timeout: 7000
max-concurrent: 1
tolerable-failure: 2
storage:
type: hdfs
max-retained: 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,6 @@ private CheckpointConfig parseCheckpointConfig(Node checkpointNode) {
getIntegerValue(
ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.key(),
getTextContent(node)));
} else if (ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.key().equals(name)) {
checkpointConfig.setMaxConcurrentCheckpoints(
getIntegerValue(
ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.key(),
getTextContent(node)));
} else if (ServerConfigOptions.CHECKPOINT_TOLERABLE_FAILURE.key().equals(name)) {
checkpointConfig.setTolerableFailureCheckpoints(
getIntegerValue(
ServerConfigOptions.CHECKPOINT_TOLERABLE_FAILURE.key(),
getTextContent(node)));
} else if (ServerConfigOptions.CHECKPOINT_STORAGE.key().equals(name)) {
checkpointConfig.setStorage(parseCheckpointStorageConfig(node));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ public class CheckpointConfig implements Serializable {
private long checkpointTimeout = ServerConfigOptions.CHECKPOINT_TIMEOUT.defaultValue();
private long schemaChangeCheckpointTimeout =
ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.defaultValue();
private int maxConcurrentCheckpoints =
ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.defaultValue();
private int tolerableFailureCheckpoints =
ServerConfigOptions.CHECKPOINT_TOLERABLE_FAILURE.defaultValue();

private CheckpointStorageConfig storage = ServerConfigOptions.CHECKPOINT_STORAGE.defaultValue();

Expand All @@ -60,18 +56,4 @@ public void setSchemaChangeCheckpointTimeout(long checkpointTimeout) {
"The minimum checkpoint timeout is 10 ms.");
this.schemaChangeCheckpointTimeout = checkpointTimeout;
}

public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints) {
checkArgument(
maxConcurrentCheckpoints >= 1,
"The minimum number of concurrent checkpoints is 1.");
this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
}

public void setTolerableFailureCheckpoints(int tolerableFailureCheckpoints) {
checkArgument(
maxConcurrentCheckpoints >= 0,
"The number of tolerance failed checkpoints must be a natural number.");
this.tolerableFailureCheckpoints = tolerableFailureCheckpoints;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,6 @@ public class ServerConfigOptions {
.withDescription(
"The timeout (in milliseconds) for a schema change checkpoint.");

public static final Option<Integer> CHECKPOINT_MAX_CONCURRENT =
Options.key("max-concurrent")
.intType()
.defaultValue(1)
.withDescription("The maximum number of concurrent checkpoints.");

public static final Option<Integer> CHECKPOINT_TOLERABLE_FAILURE =
Options.key("tolerable-failure")
.intType()
.defaultValue(0)
.withDescription("The tolerable failure number of a checkpoint.");

public static final Option<String> CHECKPOINT_STORAGE_TYPE =
Options.key("type")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ seatunnel:
checkpoint:
interval: 300000
timeout: 10000
max-concurrent: 1
tolerable-failure: 2
storage:
type: hdfs
max-retained: 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,6 @@ public void testSeaTunnelConfig() {
Assertions.assertEquals(
7000, config.getEngineConfig().getCheckpointConfig().getCheckpointTimeout());

Assertions.assertEquals(
1, config.getEngineConfig().getCheckpointConfig().getMaxConcurrentCheckpoints());

Assertions.assertEquals(
2, config.getEngineConfig().getCheckpointConfig().getTolerableFailureCheckpoints());

Assertions.assertEquals(
"hdfs", config.getEngineConfig().getCheckpointConfig().getStorage().getStorage());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ seatunnel:
checkpoint:
interval: 6000
timeout: 7000
max-concurrent: 1
tolerable-failure: 2
storage:
type: hdfs
max-retained: 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ public class CheckpointCoordinator {

private final CheckpointConfig coordinatorConfig;

private int tolerableFailureCheckpoints;
private transient ScheduledExecutorService scheduler;

private final AtomicLong latestTriggerTimestamp = new AtomicLong(0);
Expand Down Expand Up @@ -165,7 +164,6 @@ public CheckpointCoordinator(
this.runningJobStateIMap = runningJobStateIMap;
this.plan = plan;
this.coordinatorConfig = checkpointConfig;
this.tolerableFailureCheckpoints = coordinatorConfig.getTolerableFailureCheckpoints();
this.pendingCheckpoints = new ConcurrentHashMap<>();
this.completedCheckpoints =
new ArrayDeque<>(coordinatorConfig.getStorage().getMaxRetainedCheckpoints() + 1);
Expand Down Expand Up @@ -392,7 +390,6 @@ protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
if (checkpointType.notFinalCheckpoint() && checkpointType.notSchemaChangeCheckpoint()) {
if (currentTimestamp - latestTriggerTimestamp.get()
< coordinatorConfig.getCheckpointInterval()
|| pendingCounter.get() >= coordinatorConfig.getMaxConcurrentCheckpoints()
|| !isAllTaskReady) {
return;
}
Expand Down Expand Up @@ -531,16 +528,9 @@ private void startTriggerPendingCheckpoint(
if (pendingCheckpoints.get(pendingCheckpoint.getCheckpointId())
!= null
&& !pendingCheckpoint.isFullyAcknowledged()) {
if (tolerableFailureCheckpoints-- <= 0
|| pendingCheckpoint
.getCheckpointType()
.isSchemaChangeCheckpoint()) {
LOG.info(
"timeout checkpoint: "
+ pendingCheckpoint.getInfo());
handleCoordinatorError(
CheckpointCloseReason.CHECKPOINT_EXPIRED, null);
}
LOG.info("timeout checkpoint: " + pendingCheckpoint.getInfo());
handleCoordinatorError(
CheckpointCloseReason.CHECKPOINT_EXPIRED, null);
}
},
checkpointTimeout,
Expand Down Expand Up @@ -746,12 +736,6 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed
notifyCompleted(completedCheckpoint);
pendingCheckpoints.remove(checkpointId);
pendingCounter.decrementAndGet();
if (pendingCheckpoints.size() + 1 == coordinatorConfig.getMaxConcurrentCheckpoints()) {
// latest checkpoint completed time > checkpoint interval
if (completedCheckpoint.getCheckpointType().notFinalCheckpoint()) {
scheduleTriggerPendingCheckpoint(0L);
}
}
if (isCompleted()) {
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED);
if (latestCompletedCheckpoint.getCheckpointType().isSavepoint()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,6 @@ private CheckpointConfig createJobCheckpointConfig(
CheckpointConfig jobCheckpointConfig = new CheckpointConfig();
jobCheckpointConfig.setCheckpointTimeout(defaultCheckpointConfig.getCheckpointTimeout());
jobCheckpointConfig.setCheckpointInterval(defaultCheckpointConfig.getCheckpointInterval());
jobCheckpointConfig.setMaxConcurrentCheckpoints(
defaultCheckpointConfig.getMaxConcurrentCheckpoints());
jobCheckpointConfig.setTolerableFailureCheckpoints(
defaultCheckpointConfig.getTolerableFailureCheckpoints());

CheckpointStorageConfig jobCheckpointStorageConfig = new CheckpointStorageConfig();
jobCheckpointStorageConfig.setStorage(defaultCheckpointConfig.getStorage().getStorage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ seatunnel:
checkpoint:
interval: 6000
timeout: 7000
max-concurrent: 1
tolerable-failure: 2
storage:
type: hdfs
max-retained: 3
Expand Down
Loading