From feec8f13d8e17c85b206f04e9082d2efe91bb26b Mon Sep 17 00:00:00 2001 From: gaojun Date: Fri, 7 Jul 2023 18:04:02 +0800 Subject: [PATCH 1/3] update version to 2.3.3-SNAPSHOT --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 51b03a26d5b..7dce624be31 100644 --- a/pom.xml +++ b/pom.xml @@ -55,7 +55,7 @@ - 2.3.2-SNAPSHOT + 2.3.3-SNAPSHOT 2.1.1 UTF-8 1.8 From a16cd5712821240502cc1f08d4b9578e19dbe239 Mon Sep 17 00:00:00 2001 From: gaojun Date: Mon, 10 Jul 2023 20:09:00 +0800 Subject: [PATCH 2/3] update dependency version in know dependencies file --- tools/dependencies/known-dependencies.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index 3a1e736b68b..70bbd1c0df5 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -22,8 +22,8 @@ protostuff-collectionschema-1.8.0.jar protostuff-core-1.8.0.jar protostuff-runtime-1.8.0.jar scala-library-2.11.12.jar -seatunnel-jackson-2.3.2-SNAPSHOT-optional.jar -seatunnel-guava-2.3.2-SNAPSHOT-optional.jar +seatunnel-jackson-2.3.3-SNAPSHOT-optional.jar +seatunnel-guava-2.3.3-SNAPSHOT-optional.jar slf4j-api-1.7.25.jar jsqlparser-4.5.jar animal-sniffer-annotations-1.17.jar From c49a4498d0c682f865568f7d31de4b864c7454f5 Mon Sep 17 00:00:00 2001 From: gaojun Date: Thu, 13 Jul 2023 12:31:02 +0800 Subject: [PATCH 3/3] Add logs to find job restore from master active switch error --- .../server/checkpoint/CheckpointManager.java | 4 ++++ .../server/dag/physical/PhysicalVertex.java | 2 +- .../engine/server/dag/physical/SubPlan.java | 15 ++++++++++++--- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java index 9f9649f03a1..f34ae2f6a0a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java @@ -47,6 +47,7 @@ import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture; import lombok.extern.slf4j.Slf4j; +import java.util.Arrays; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -167,6 +168,9 @@ public PassiveCompletableFuture triggerSavepoint(int pipeli } public void reportedPipelineRunning(int pipelineId, boolean alreadyStarted) { + log.info( + "reported pipeline running stack: " + + Arrays.toString(Thread.currentThread().getStackTrace())); getCheckpointCoordinator(pipelineId).restoreCoordinator(alreadyStarted); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java index 65666413548..3c840a269ad 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java @@ -394,7 +394,7 @@ private boolean turnToEndState(@NonNull ExecutionState endState) { public boolean updateTaskState( @NonNull ExecutionState current, @NonNull ExecutionState targetState) { synchronized (this) { - LOGGER.fine( + LOGGER.info( String.format( "Try to update the task %s state from %s to %s", taskFullName, current, targetState)); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java index bc9e3e2aaef..83dd4e9d0f2 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java @@ -53,7 +53,7 @@ public class SubPlan { private static final ILogger LOGGER = Logger.getLogger(SubPlan.class); /** The max num pipeline can restore. */ - public static final int PIPELINE_MAX_RESTORE_NUM = 2; // TODO should set by config + public static final int PIPELINE_MAX_RESTORE_NUM = 3; // TODO should set by config private final List physicalVertexList; @@ -332,6 +332,9 @@ private void turnToEndState(@NonNull PipelineStatus endState) throws Exception { exception -> ExceptionUtil.isOperationNeedRetryException(exception), Constant.OPERATION_RETRY_SLEEP)); this.currPipelineStatus = endState; + LOGGER.info( + String.format( + "%s turn to end state %s.", pipelineFullName, currPipelineStatus)); } } @@ -511,11 +514,17 @@ private void resetPipelineState() throws Exception { LOGGER.severe(message); throw new IllegalStateException(message); } - + LOGGER.info( + String.format( + "Reset pipeline %s state to %s", + getPipelineFullName(), PipelineStatus.CREATED)); updateStateTimestamps(PipelineStatus.CREATED); runningJobStateIMap.set(pipelineLocation, PipelineStatus.CREATED); this.currPipelineStatus = PipelineStatus.CREATED; - ; + LOGGER.info( + String.format( + "Reset pipeline %s state to %s complete", + getPipelineFullName(), PipelineStatus.CREATED)); return null; }, new RetryUtils.RetryMaterial(