From 15c52ff521a81cd89ff0cf81cbb8a7445341b4e6 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 11 Jan 2024 17:37:55 +0800 Subject: [PATCH] [Improve][Zeta] Add restore when commit failed (#6101) * [Improve][Zeta] Add restore when commit failed * update * update --- .../checkpoint/CheckpointCoordinator.java | 7 +- .../server/checkpoint/CheckpointManager.java | 4 +- .../engine/server/master/JobMasterTest.java | 254 ++++++++++++++++++ 3 files changed, 261 insertions(+), 4 deletions(-) create mode 100644 seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index b9bd7b6498aa..6a88f169fc55 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -45,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.hazelcast.map.IMap; import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture; import lombok.Getter; @@ -260,7 +261,8 @@ protected void reportedTask(TaskReportStatusOperation operation) { }); } - private void handleCoordinatorError(String message, Throwable e, CheckpointCloseReason reason) { + @VisibleForTesting + public void handleCoordinatorError(String message, Throwable e, CheckpointCloseReason reason) { LOG.error(message, e); handleCoordinatorError(reason, e); } @@ -277,8 +279,7 @@ private void handleCoordinatorError(CheckpointCloseReason reason, Throwable e) { checkpointCoordinatorFuture.complete( new CheckpointCoordinatorState( CheckpointCoordinatorStatus.FAILED, errorByPhysicalVertex.get())); - checkpointManager.handleCheckpointError( - pipelineId, reason.equals(CheckpointCloseReason.CHECKPOINT_NOTIFY_COMPLETE_FAILED)); + checkpointManager.handleCheckpointError(pipelineId, false); } private void restoreTaskState(TaskLocation taskLocation) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java index 941af4b791b3..d915cb5a79ce 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java @@ -44,6 +44,7 @@ import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState; import org.apache.seatunnel.engine.server.utils.NodeEngineUtil; +import com.google.common.annotations.VisibleForTesting; import com.hazelcast.map.IMap; import com.hazelcast.spi.impl.NodeEngine; import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture; @@ -188,7 +189,8 @@ public void reportCheckpointErrorFromTask(TaskLocation taskLocation, String erro getCheckpointCoordinator(taskLocation).reportCheckpointErrorFromTask(errorMsg); } - private CheckpointCoordinator getCheckpointCoordinator(int pipelineId) { + @VisibleForTesting + public CheckpointCoordinator getCheckpointCoordinator(int pipelineId) { CheckpointCoordinator coordinator = coordinatorMap.get(pipelineId); if (coordinator == null) { throw new RuntimeException( diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java new file mode 100644 index 000000000000..dd886d923e56 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.master; + +import org.apache.seatunnel.engine.common.Constant; +import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; +import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; +import org.apache.seatunnel.engine.core.job.JobImmutableInformation; +import org.apache.seatunnel.engine.core.job.JobInfo; +import org.apache.seatunnel.engine.core.job.JobResult; +import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.engine.core.job.PipelineStatus; +import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest; +import org.apache.seatunnel.engine.server.TestUtils; +import org.apache.seatunnel.engine.server.checkpoint.CheckpointCloseReason; +import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation; +import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; +import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +import com.hazelcast.internal.serialization.Data; +import com.hazelcast.map.IMap; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; + +/** JobMaster Tester. */ +@DisabledOnOs(OS.WINDOWS) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class JobMasterTest extends AbstractSeaTunnelServerTest { + /** + * IMap key is jobId and value is a Tuple2 Tuple2 key is JobMaster init timestamp and value is + * the jobImmutableInformation which is sent by client when submit job + * + *

This IMap is used to recovery runningJobInfoIMap in JobMaster when a new master node + * active + */ + private IMap runningJobInfoIMap; + + /** + * IMap key is one of jobId {@link + * org.apache.seatunnel.engine.server.dag.physical.PipelineLocation} and {@link + * org.apache.seatunnel.engine.server.execution.TaskGroupLocation} + * + *

The value of IMap is one of {@link JobStatus} {@link PipelineStatus} {@link + * org.apache.seatunnel.engine.server.execution.ExecutionState} + * + *

This IMap is used to recovery runningJobStateIMap in JobMaster when a new master node + * active + */ + IMap runningJobStateIMap; + + /** + * IMap key is one of jobId {@link + * org.apache.seatunnel.engine.server.dag.physical.PipelineLocation} and {@link + * org.apache.seatunnel.engine.server.execution.TaskGroupLocation} + * + *

The value of IMap is one of {@link + * org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan} stateTimestamps {@link + * org.apache.seatunnel.engine.server.dag.physical.SubPlan} stateTimestamps {@link + * org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex} stateTimestamps + * + *

This IMap is used to recovery runningJobStateTimestampsIMap in JobMaster when a new master + * node active + */ + IMap runningJobStateTimestampsIMap; + + /** + * IMap key is {@link PipelineLocation} + * + *

The value of IMap is map of {@link TaskGroupLocation} and the {@link SlotProfile} it used. + * + *

This IMap is used to recovery ownedSlotProfilesIMap in JobMaster when a new master node + * active + */ + private IMap> ownedSlotProfilesIMap; + + @BeforeAll + public void before() { + super.before(); + } + + @Test + public void testHandleCheckpointTimeout() throws Exception { + long jobId = instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId(); + JobMaster jobMaster = newJobInstanceWithRunningState(jobId); + + jobMaster.neverNeedRestore(); + // call checkpoint timeout + jobMaster.handleCheckpointError(1, false); + + PassiveCompletableFuture jobMasterCompleteFuture = + jobMaster.getJobMasterCompleteFuture(); + + // test job turn to complete + await().atMost(120000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + // Why equals CANCELED or FAILED? because handleCheckpointError + // should call by CheckpointCoordinator, + // before do this, CheckpointCoordinator should be failed. Anyway, + // use handleCheckpointError not good to test checkpoint timeout. + Assertions.assertTrue( + jobMasterCompleteFuture.isDone() + && (JobStatus.CANCELED.equals( + jobMasterCompleteFuture + .get() + .getStatus()) + || JobStatus.FAILED.equals( + jobMasterCompleteFuture + .get() + .getStatus())))); + + testIMapRemovedAfterJobComplete(jobId, jobMaster); + } + + private void testIMapRemovedAfterJobComplete(long jobId, JobMaster jobMaster) { + runningJobInfoIMap = nodeEngine.getHazelcastInstance().getMap("runningJobInfo"); + runningJobStateIMap = nodeEngine.getHazelcastInstance().getMap("runningJobState"); + runningJobStateTimestampsIMap = nodeEngine.getHazelcastInstance().getMap("stateTimestamps"); + ownedSlotProfilesIMap = nodeEngine.getHazelcastInstance().getMap("ownedSlotProfilesIMap"); + + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertNull(runningJobInfoIMap.get(jobId)); + Assertions.assertNull(runningJobStateIMap.get(jobId)); + Assertions.assertNull(runningJobStateTimestampsIMap.get(jobId)); + Assertions.assertNull(ownedSlotProfilesIMap.get(jobId)); + + jobMaster + .getPhysicalPlan() + .getPipelineList() + .forEach( + pipeline -> { + Assertions.assertNull( + runningJobStateIMap.get( + pipeline.getPipelineLocation())); + + Assertions.assertNull( + runningJobStateTimestampsIMap.get( + pipeline.getPipelineLocation())); + }); + jobMaster + .getPhysicalPlan() + .getPipelineList() + .forEach( + pipeline -> { + pipeline.getCoordinatorVertexList() + .forEach( + coordinator -> { + Assertions.assertNull( + runningJobStateIMap.get( + coordinator + .getTaskGroupLocation())); + + Assertions.assertNull( + runningJobStateTimestampsIMap + .get( + coordinator + .getTaskGroupLocation())); + }); + + pipeline.getPhysicalVertexList() + .forEach( + task -> { + Assertions.assertNull( + runningJobStateIMap.get( + task + .getTaskGroupLocation())); + + Assertions.assertNull( + runningJobStateTimestampsIMap + .get( + task + .getTaskGroupLocation())); + }); + }); + }); + } + + @Test + public void testCommitFailedWillRestore() throws Exception { + long jobId = instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId(); + JobMaster jobMaster = newJobInstanceWithRunningState(jobId); + + // call checkpoint timeout + jobMaster + .getCheckpointManager() + .getCheckpointCoordinator(1) + .handleCoordinatorError( + "commit failed", + new RuntimeException(), + CheckpointCloseReason.AGGREGATE_COMMIT_ERROR); + Assertions.assertTrue(jobMaster.isNeedRestore()); + } + + private JobMaster newJobInstanceWithRunningState(long jobId) throws InterruptedException { + LogicalDag testLogicalDag = + TestUtils.createTestLogicalPlan( + "stream_fakesource_to_file.conf", "test_clear_coordinator_service", jobId); + + JobImmutableInformation jobImmutableInformation = + new JobImmutableInformation( + jobId, + "Test", + nodeEngine.getSerializationService().toData(testLogicalDag), + testLogicalDag.getJobConfig(), + Collections.emptyList(), + Collections.emptyList()); + + Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation); + + PassiveCompletableFuture voidPassiveCompletableFuture = + server.getCoordinatorService().submitJob(jobId, data); + voidPassiveCompletableFuture.join(); + + JobMaster jobMaster = server.getCoordinatorService().getJobMaster(jobId); + + // waiting for job status turn to running + await().atMost(120000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> Assertions.assertEquals(JobStatus.RUNNING, jobMaster.getJobStatus())); + + // Because handleCheckpointTimeout is an async method, so we need sleep 5s to waiting job + // status become running again + Thread.sleep(5000); + return jobMaster; + } +}