From 0f9a56fbe75c93936051d7248c6c1e0858644d2c Mon Sep 17 00:00:00 2001 From: William Lo Date: Thu, 20 Apr 2023 23:55:31 -0400 Subject: [PATCH 1/2] Let flow execution ID propagate to the Job ID if it exists --- .../java/org/apache/gobblin/cluster/HelixUtils.java | 4 +++- .../java/org/apache/gobblin/util/JobLauncherUtils.java | 10 ++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java index 54e5a8108ab..6efc6fff01c 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java @@ -61,6 +61,7 @@ import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.gobblin.util.Id; import org.apache.gobblin.util.JobLauncherUtils; +import org.apache.gobblin.util.PropertiesUtils; import static org.apache.helix.task.TaskState.STOPPED; @@ -159,7 +160,8 @@ public static List> initBaseEventTags(Properties jobProps, if (jobProps.containsKey(ConfigurationKeys.JOB_ID_KEY)) { jobId = jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY); } else { - jobId = JobLauncherUtils.newJobId(JobState.getJobNameFromProps(jobProps)); + jobId = JobLauncherUtils.newJobId(JobState.getJobNameFromProps(jobProps), + PropertiesUtils.getPropAsLong(jobProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis())); jobProps.put(ConfigurationKeys.JOB_ID_KEY, jobId); } diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java index 401d159be82..c459ae1e04a 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java @@ -68,6 +68,16 @@ public static String newJobId(String jobName) { return Id.Job.create(jobName, System.currentTimeMillis()).toString(); } + /** + * Create a new job ID from a flow execution ID. + * + * @param jobName job name + * @return new job ID + */ + public static String newJobId(String jobName, long flowId) { + return Id.Job.create(jobName, flowId).toString(); + } + /** * Create a new task ID for the job with the given job ID. * From 1b09770a304f030eab7773bd7d7e21f4c082cdf7 Mon Sep 17 00:00:00 2001 From: William Lo Date: Fri, 21 Apr 2023 17:08:48 -0400 Subject: [PATCH 2/2] Address comment --- .../main/java/org/apache/gobblin/util/JobLauncherUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java index c459ae1e04a..42fae521e8e 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java @@ -74,8 +74,8 @@ public static String newJobId(String jobName) { * @param jobName job name * @return new job ID */ - public static String newJobId(String jobName, long flowId) { - return Id.Job.create(jobName, flowId).toString(); + public static String newJobId(String jobName, long executionId) { + return Id.Job.create(jobName, executionId).toString(); } /**