Skip to content

Commit

Permalink
[Improve][Zeta] Clean checkpoint file when job FINISHED/CANCELED (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Jun 14, 2024
1 parent 4aa5998 commit 670bba0
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,11 @@ public CompletableFuture<Void> listenPipeline(int pipelineId, PipelineStatus pip
* Called by the JobMaster. <br>
* Listen to the {@link JobStatus} of the {@link Job}.
*/
public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
public void clearCheckpointIfNeed(JobStatus jobStatus) {
if ((jobStatus == JobStatus.FINISHED || jobStatus == JobStatus.CANCELED)
&& !isSavePointEnd()) {
checkpointStorage.deleteCheckpoint(jobId + "");
}
return CompletableFuture.completedFuture(null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ public void releasePipelineResource(SubPlan subPlan) {
}

public void cleanJob() {
checkpointManager.clearCheckpointIfNeed(physicalPlan.getJobStatus());
jobHistoryService.storeJobInfo(jobImmutableInformation.getJobId(), getJobDAGInfo());
jobHistoryService.storeFinishedJobState(this);
removeJobIMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID;
import static org.apache.seatunnel.engine.common.Constant.IMAP_RUNNING_JOB_STATE;
Expand Down Expand Up @@ -94,8 +93,7 @@ public void testHAByIMapCheckpointIDCounter() throws CheckpointStorageException
Assertions.assertTrue(checkpointManager.isCompletedPipeline(1));
checkpointManager.listenPipeline(1, PipelineStatus.FINISHED);
Assertions.assertNull(checkpointIdMap.get(1));
CompletableFuture<Void> future = checkpointManager.shutdown(JobStatus.FINISHED);
future.join();
checkpointManager.clearCheckpointIfNeed(JobStatus.FINISHED);
Assertions.assertTrue(checkpointStorage.getAllCheckpoints(jobId + "").isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public class CheckpointStorageTest extends AbstractSeaTunnelServerTest {

public static String STREAM_CONF_PATH = "stream_fake_to_console_biginterval.conf";
public static String BATCH_CONF_PATH = "batch_fakesource_to_file.conf";
public static String BATCH_CONF_WITH_CHECKPOINT_PATH =
"batch_fakesource_to_file_with_checkpoint.conf";

public static String STREAM_CONF_WITH_CHECKPOINT_PATH =
"stream_fake_to_console_with_checkpoint.conf";

@Override
public SeaTunnelConfig loadSeaTunnelConfig() {
Expand Down Expand Up @@ -113,4 +118,63 @@ public void testBatchJob() throws CheckpointStorageException {
checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
Assertions.assertEquals(0, allCheckpoints.size());
}

@Test
public void testBatchJobWithCheckpoint() throws CheckpointStorageException {
long jobId = System.currentTimeMillis();
CheckpointConfig checkpointConfig =
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);

CheckpointStorage checkpointStorage =
FactoryUtil.discoverFactory(
Thread.currentThread().getContextClassLoader(),
CheckpointStorageFactory.class,
checkpointConfig.getStorage().getStorage())
.create(checkpointConfig.getStorage().getStoragePluginConfig());
startJob(jobId, BATCH_CONF_WITH_CHECKPOINT_PATH, false);
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
server.getCoordinatorService().getJobStatus(jobId),
JobStatus.FINISHED));
List<PipelineState> allCheckpoints =
checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
Assertions.assertEquals(0, allCheckpoints.size());
}

@Test
public void testStreamJobWithCancel() throws CheckpointStorageException, InterruptedException {
long jobId = System.currentTimeMillis();
CheckpointConfig checkpointConfig =
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);

CheckpointStorage checkpointStorage =
FactoryUtil.discoverFactory(
Thread.currentThread().getContextClassLoader(),
CheckpointStorageFactory.class,
checkpointConfig.getStorage().getStorage())
.create(checkpointConfig.getStorage().getStoragePluginConfig());
startJob(jobId, STREAM_CONF_WITH_CHECKPOINT_PATH, false);
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
server.getCoordinatorService().getJobStatus(jobId),
JobStatus.RUNNING));
// wait for checkpoint
Thread.sleep(10 * 1000);
server.getCoordinatorService().getJobMaster(jobId).cancelJob();
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
server.getCoordinatorService().getJobStatus(jobId),
JobStatus.CANCELED));
List<PipelineState> allCheckpoints =
checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
Assertions.assertEquals(0, allCheckpoints.size());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 1000
}

source {
FakeSource {
row.num = 100
split.num = 5
split.read-interval = 3000
result_table_name = "fake"
schema = {
fields {
name = "string"
age = "int"
}
}
parallelism = 1
}
}

transform {
}

sink {
LocalFile {
path="/tmp/hive/warehouse/test2"
field_delimiter="\t"
row_delimiter="\n"
partition_by=["age"]
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
file_format_type="text"
sink_columns=["name","age"]
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
save_mode="error"

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in SeaTunnel config
######

env {
# You can set SeaTunnel environment configuration here
parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 1000
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
parallelism = 2
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}

# If you would like to get more information about how to configure SeaTunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/category/source-v2
}

sink {
Console {
}

# If you would like to get more information about how to configure SeaTunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/category/sink-v2
}

0 comments on commit 670bba0

Please sign in to comment.