From e9f0af5bfa343c487d51cd8cf5d95ee2a18fa248 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 3 Aug 2023 18:18:54 +0800 Subject: [PATCH] [Improve][Zeta] Don't trigger handleSaveMode when restore (#5192) --- .../engine/client/job/JobExecutionEnvironment.java | 2 +- .../engine/core/parse/JobConfigParser.java | 12 +++++++++--- .../core/parse/MultipleTableJobConfigParser.java | 14 ++++++++++---- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java index 9f28f6fdbb0c..bf3169e4c803 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java @@ -131,7 +131,7 @@ private Set searchPluginJars() { private MultipleTableJobConfigParser getJobConfigParser() { return new MultipleTableJobConfigParser( - jobFilePath, idGenerator, jobConfig, commonPluginJars); + jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint); } private LogicalDagGenerator getLogicalDagGenerator() { diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java index d81de1702ed0..09bae74f5a24 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java @@ -63,12 +63,16 @@ public class JobConfigParser { private static final ILogger LOGGER = Logger.getLogger(JobConfigParser.class); private IdGenerator idGenerator; - + private boolean isStartWithSavePoint; private List commonPluginJars; - public JobConfigParser(@NonNull IdGenerator idGenerator, @NonNull List commonPluginJars) { + public JobConfigParser( + @NonNull IdGenerator idGenerator, + @NonNull List commonPluginJars, + boolean isStartWithSavePoint) { this.idGenerator = idGenerator; this.commonPluginJars = commonPluginJars; + this.isStartWithSavePoint = isStartWithSavePoint; } public Tuple2 parseSource( @@ -190,7 +194,9 @@ public Tuple2 parseTransform( sink.prepare(config); sink.setJobContext(jobConfig.getJobContext()); sink.setTypeInfo(rowType); - handleSaveMode(sink); + if (!isStartWithSavePoint) { + handleSaveMode(sink); + } final String actionName = createSinkActionName(0, tuple.getLeft().getPluginName(), getTableName(config)); final SinkAction action = diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index 09027a2a248f..86c0f3c94f59 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -107,23 +107,27 @@ public class MultipleTableJobConfigParser { private final ReadonlyConfig envOptions; private final JobConfigParser fallbackParser; + private final boolean isStartWithSavePoint; public MultipleTableJobConfigParser( String jobDefineFilePath, IdGenerator idGenerator, JobConfig jobConfig) { - this(jobDefineFilePath, idGenerator, jobConfig, Collections.emptyList()); + this(jobDefineFilePath, idGenerator, jobConfig, Collections.emptyList(), false); } public MultipleTableJobConfigParser( String jobDefineFilePath, IdGenerator idGenerator, JobConfig jobConfig, - List commonPluginJars) { + List commonPluginJars, + boolean isStartWithSavePoint) { this.idGenerator = idGenerator; this.jobConfig = jobConfig; this.commonPluginJars = commonPluginJars; + this.isStartWithSavePoint = isStartWithSavePoint; this.seaTunnelJobConfig = ConfigBuilder.of(Paths.get(jobDefineFilePath)); this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env")); - this.fallbackParser = new JobConfigParser(idGenerator, commonPluginJars); + this.fallbackParser = + new JobConfigParser(idGenerator, commonPluginJars, isStartWithSavePoint); } public ImmutablePair, Set> parse() { @@ -607,7 +611,9 @@ private static T findLast(LinkedHashMap map) { sink, factoryUrls, actionConfig); - handleSaveMode(sink); + if (!isStartWithSavePoint) { + handleSaveMode(sink); + } sinkAction.setParallelism(parallelism); return sinkAction; }