Skip to content

Commit

Permalink
[Improve][Zeta] Don't trigger handleSaveMode when restore (apache#5192)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored and liunaijie committed Aug 12, 2023
1 parent 54bf5ea commit e9f0af5
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private Set<URL> searchPluginJars() {

private MultipleTableJobConfigParser getJobConfigParser() {
return new MultipleTableJobConfigParser(
jobFilePath, idGenerator, jobConfig, commonPluginJars);
jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
}

private LogicalDagGenerator getLogicalDagGenerator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,16 @@
public class JobConfigParser {
private static final ILogger LOGGER = Logger.getLogger(JobConfigParser.class);
private IdGenerator idGenerator;

private boolean isStartWithSavePoint;
private List<URL> commonPluginJars;

public JobConfigParser(@NonNull IdGenerator idGenerator, @NonNull List<URL> commonPluginJars) {
public JobConfigParser(
@NonNull IdGenerator idGenerator,
@NonNull List<URL> commonPluginJars,
boolean isStartWithSavePoint) {
this.idGenerator = idGenerator;
this.commonPluginJars = commonPluginJars;
this.isStartWithSavePoint = isStartWithSavePoint;
}

public Tuple2<CatalogTable, Action> parseSource(
Expand Down Expand Up @@ -190,7 +194,9 @@ public Tuple2<CatalogTable, Action> parseTransform(
sink.prepare(config);
sink.setJobContext(jobConfig.getJobContext());
sink.setTypeInfo(rowType);
handleSaveMode(sink);
if (!isStartWithSavePoint) {
handleSaveMode(sink);
}
final String actionName =
createSinkActionName(0, tuple.getLeft().getPluginName(), getTableName(config));
final SinkAction action =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,27 @@ public class MultipleTableJobConfigParser {
private final ReadonlyConfig envOptions;

private final JobConfigParser fallbackParser;
private final boolean isStartWithSavePoint;

public MultipleTableJobConfigParser(
String jobDefineFilePath, IdGenerator idGenerator, JobConfig jobConfig) {
this(jobDefineFilePath, idGenerator, jobConfig, Collections.emptyList());
this(jobDefineFilePath, idGenerator, jobConfig, Collections.emptyList(), false);
}

public MultipleTableJobConfigParser(
String jobDefineFilePath,
IdGenerator idGenerator,
JobConfig jobConfig,
List<URL> commonPluginJars) {
List<URL> commonPluginJars,
boolean isStartWithSavePoint) {
this.idGenerator = idGenerator;
this.jobConfig = jobConfig;
this.commonPluginJars = commonPluginJars;
this.isStartWithSavePoint = isStartWithSavePoint;
this.seaTunnelJobConfig = ConfigBuilder.of(Paths.get(jobDefineFilePath));
this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
this.fallbackParser = new JobConfigParser(idGenerator, commonPluginJars);
this.fallbackParser =
new JobConfigParser(idGenerator, commonPluginJars, isStartWithSavePoint);
}

public ImmutablePair<List<Action>, Set<URL>> parse() {
Expand Down Expand Up @@ -607,7 +611,9 @@ private static <T> T findLast(LinkedHashMap<?, T> map) {
sink,
factoryUrls,
actionConfig);
handleSaveMode(sink);
if (!isStartWithSavePoint) {
handleSaveMode(sink);
}
sinkAction.setParallelism(parallelism);
return sinkAction;
}
Expand Down

0 comments on commit e9f0af5

Please sign in to comment.