Skip to content

Commit

Permalink
[Improve][Zeta] Add restore when commit failed (apache#6101)
Browse files Browse the repository at this point in the history
* [Improve][Zeta] Add restore when commit failed

* update

* update
  • Loading branch information
Hisoka-X authored and chaorongzhi committed Aug 21, 2024
1 parent e9ebb01 commit 15c52ff
Show file tree
Hide file tree
Showing 3 changed files with 261 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.Getter;
Expand Down Expand Up @@ -260,7 +261,8 @@ protected void reportedTask(TaskReportStatusOperation operation) {
});
}

private void handleCoordinatorError(String message, Throwable e, CheckpointCloseReason reason) {
@VisibleForTesting
public void handleCoordinatorError(String message, Throwable e, CheckpointCloseReason reason) {
LOG.error(message, e);
handleCoordinatorError(reason, e);
}
Expand All @@ -277,8 +279,7 @@ private void handleCoordinatorError(CheckpointCloseReason reason, Throwable e) {
checkpointCoordinatorFuture.complete(
new CheckpointCoordinatorState(
CheckpointCoordinatorStatus.FAILED, errorByPhysicalVertex.get()));
checkpointManager.handleCheckpointError(
pipelineId, reason.equals(CheckpointCloseReason.CHECKPOINT_NOTIFY_COMPLETE_FAILED));
checkpointManager.handleCheckpointError(pipelineId, false);
}

private void restoreTaskState(TaskLocation taskLocation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;

import com.google.common.annotations.VisibleForTesting;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
Expand Down Expand Up @@ -188,7 +189,8 @@ public void reportCheckpointErrorFromTask(TaskLocation taskLocation, String erro
getCheckpointCoordinator(taskLocation).reportCheckpointErrorFromTask(errorMsg);
}

private CheckpointCoordinator getCheckpointCoordinator(int pipelineId) {
@VisibleForTesting
public CheckpointCoordinator getCheckpointCoordinator(int pipelineId) {
CheckpointCoordinator coordinator = coordinatorMap.get(pipelineId);
if (coordinator == null) {
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.server.master;

import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobInfo;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.TestUtils;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointCloseReason;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

import com.hazelcast.internal.serialization.Data;
import com.hazelcast.map.IMap;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;

/** JobMaster Tester. */
@DisabledOnOs(OS.WINDOWS)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class JobMasterTest extends AbstractSeaTunnelServerTest {
/**
* IMap key is jobId and value is a Tuple2 Tuple2 key is JobMaster init timestamp and value is
* the jobImmutableInformation which is sent by client when submit job
*
* <p>This IMap is used to recovery runningJobInfoIMap in JobMaster when a new master node
* active
*/
private IMap<Long, JobInfo> runningJobInfoIMap;

/**
* IMap key is one of jobId {@link
* org.apache.seatunnel.engine.server.dag.physical.PipelineLocation} and {@link
* org.apache.seatunnel.engine.server.execution.TaskGroupLocation}
*
* <p>The value of IMap is one of {@link JobStatus} {@link PipelineStatus} {@link
* org.apache.seatunnel.engine.server.execution.ExecutionState}
*
* <p>This IMap is used to recovery runningJobStateIMap in JobMaster when a new master node
* active
*/
IMap<Object, Object> runningJobStateIMap;

/**
* IMap key is one of jobId {@link
* org.apache.seatunnel.engine.server.dag.physical.PipelineLocation} and {@link
* org.apache.seatunnel.engine.server.execution.TaskGroupLocation}
*
* <p>The value of IMap is one of {@link
* org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan} stateTimestamps {@link
* org.apache.seatunnel.engine.server.dag.physical.SubPlan} stateTimestamps {@link
* org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex} stateTimestamps
*
* <p>This IMap is used to recovery runningJobStateTimestampsIMap in JobMaster when a new master
* node active
*/
IMap<Object, Long[]> runningJobStateTimestampsIMap;

/**
* IMap key is {@link PipelineLocation}
*
* <p>The value of IMap is map of {@link TaskGroupLocation} and the {@link SlotProfile} it used.
*
* <p>This IMap is used to recovery ownedSlotProfilesIMap in JobMaster when a new master node
* active
*/
private IMap<PipelineLocation, Map<TaskGroupLocation, SlotProfile>> ownedSlotProfilesIMap;

@BeforeAll
public void before() {
super.before();
}

@Test
public void testHandleCheckpointTimeout() throws Exception {
long jobId = instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
JobMaster jobMaster = newJobInstanceWithRunningState(jobId);

jobMaster.neverNeedRestore();
// call checkpoint timeout
jobMaster.handleCheckpointError(1, false);

PassiveCompletableFuture<JobResult> jobMasterCompleteFuture =
jobMaster.getJobMasterCompleteFuture();

// test job turn to complete
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
// Why equals CANCELED or FAILED? because handleCheckpointError
// should call by CheckpointCoordinator,
// before do this, CheckpointCoordinator should be failed. Anyway,
// use handleCheckpointError not good to test checkpoint timeout.
Assertions.assertTrue(
jobMasterCompleteFuture.isDone()
&& (JobStatus.CANCELED.equals(
jobMasterCompleteFuture
.get()
.getStatus())
|| JobStatus.FAILED.equals(
jobMasterCompleteFuture
.get()
.getStatus()))));

testIMapRemovedAfterJobComplete(jobId, jobMaster);
}

private void testIMapRemovedAfterJobComplete(long jobId, JobMaster jobMaster) {
runningJobInfoIMap = nodeEngine.getHazelcastInstance().getMap("runningJobInfo");
runningJobStateIMap = nodeEngine.getHazelcastInstance().getMap("runningJobState");
runningJobStateTimestampsIMap = nodeEngine.getHazelcastInstance().getMap("stateTimestamps");
ownedSlotProfilesIMap = nodeEngine.getHazelcastInstance().getMap("ownedSlotProfilesIMap");

await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Assertions.assertNull(runningJobInfoIMap.get(jobId));
Assertions.assertNull(runningJobStateIMap.get(jobId));
Assertions.assertNull(runningJobStateTimestampsIMap.get(jobId));
Assertions.assertNull(ownedSlotProfilesIMap.get(jobId));

jobMaster
.getPhysicalPlan()
.getPipelineList()
.forEach(
pipeline -> {
Assertions.assertNull(
runningJobStateIMap.get(
pipeline.getPipelineLocation()));

Assertions.assertNull(
runningJobStateTimestampsIMap.get(
pipeline.getPipelineLocation()));
});
jobMaster
.getPhysicalPlan()
.getPipelineList()
.forEach(
pipeline -> {
pipeline.getCoordinatorVertexList()
.forEach(
coordinator -> {
Assertions.assertNull(
runningJobStateIMap.get(
coordinator
.getTaskGroupLocation()));

Assertions.assertNull(
runningJobStateTimestampsIMap
.get(
coordinator
.getTaskGroupLocation()));
});

pipeline.getPhysicalVertexList()
.forEach(
task -> {
Assertions.assertNull(
runningJobStateIMap.get(
task
.getTaskGroupLocation()));

Assertions.assertNull(
runningJobStateTimestampsIMap
.get(
task
.getTaskGroupLocation()));
});
});
});
}

@Test
public void testCommitFailedWillRestore() throws Exception {
long jobId = instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
JobMaster jobMaster = newJobInstanceWithRunningState(jobId);

// call checkpoint timeout
jobMaster
.getCheckpointManager()
.getCheckpointCoordinator(1)
.handleCoordinatorError(
"commit failed",
new RuntimeException(),
CheckpointCloseReason.AGGREGATE_COMMIT_ERROR);
Assertions.assertTrue(jobMaster.isNeedRestore());
}

private JobMaster newJobInstanceWithRunningState(long jobId) throws InterruptedException {
LogicalDag testLogicalDag =
TestUtils.createTestLogicalPlan(
"stream_fakesource_to_file.conf", "test_clear_coordinator_service", jobId);

JobImmutableInformation jobImmutableInformation =
new JobImmutableInformation(
jobId,
"Test",
nodeEngine.getSerializationService().toData(testLogicalDag),
testLogicalDag.getJobConfig(),
Collections.emptyList(),
Collections.emptyList());

Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);

PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
server.getCoordinatorService().submitJob(jobId, data);
voidPassiveCompletableFuture.join();

JobMaster jobMaster = server.getCoordinatorService().getJobMaster(jobId);

// waiting for job status turn to running
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> Assertions.assertEquals(JobStatus.RUNNING, jobMaster.getJobStatus()));

// Because handleCheckpointTimeout is an async method, so we need sleep 5s to waiting job
// status become running again
Thread.sleep(5000);
return jobMaster;
}
}

0 comments on commit 15c52ff

Please sign in to comment.