Skip to content

Commit

Permalink
Update OpenLineageDao to handle airflow run uuid conflicts (#2097)
Browse files Browse the repository at this point in the history
* Update OpenLineageDao to handle airflow run uuid conflicts

Signed-off-by: Michael Collado <collado.mike@gmail.com>

* Update integration tests job names to stop conflicting with jobs that have parents

Signed-off-by: Michael Collado <collado.mike@gmail.com>

Signed-off-by: Michael Collado <collado.mike@gmail.com>
  • Loading branch information
collado-mike committed Sep 1, 2022
1 parent df8cc76 commit 27b54ed
Show file tree
Hide file tree
Showing 10 changed files with 210 additions and 86 deletions.
10 changes: 5 additions & 5 deletions api/src/main/java/marquez/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,12 @@ public static UUID toNameBasedUuid(String... nameParts) {
public static UUID findParentRunUuid(ParentRunFacet parent) {
String jobName = parent.getJob().getName();
String parentRunId = parent.getRun().getRunId();
return findParentRunUuid(jobName, parentRunId);
return findParentRunUuid(parent.getJob().getNamespace(), jobName, parentRunId);
}

public static UUID findParentRunUuid(String parentJobName, String parentRunId) {
public static UUID findParentRunUuid(String namespace, String parentJobName, String parentRunId) {
String dagName = parseParentJobName(parentJobName);
return toUuid(parentRunId, dagName);
return toUuid(namespace, parentRunId, dagName);
}

public static String parseParentJobName(String parentJobName) {
Expand All @@ -214,11 +214,11 @@ public static String parseParentJobName(String parentJobName) {
* @param jobName
* @return
*/
public static UUID toUuid(@NotNull String runId, String jobName) {
public static UUID toUuid(@NotNull String namespace, @NotNull String runId, String jobName) {
try {
return UUID.fromString(runId);
} catch (IllegalArgumentException e) {
return Utils.toNameBasedUuid(jobName, runId);
return Utils.toNameBasedUuid(namespace, jobName, runId);
}
}

Expand Down
195 changes: 134 additions & 61 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,67 +160,17 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
Optional<UUID> parentUuid = parentRun.map(Utils::findParentRunUuid);
Optional<JobRow> parentJob =
parentUuid.map(
uuid -> {
try {
ParentRunFacet facet = parentRun.get(); // facet must be present
log.debug("Found parent run event {}", facet);
PGobject inputs = new PGobject();
inputs.setType("json");
inputs.setValue("[]");
JobRow parentJobRow =
runDao
.findJobRowByRunUuid(uuid)
.orElseGet(
() -> {
JobRow newParentJobRow =
jobDao.upsertJob(
UUID.randomUUID(),
getJobType(event.getJob()),
now,
namespace.getUuid(),
namespace.getName(),
Utils.parseParentJobName(facet.getJob().getName()),
null,
jobContext.getUuid(),
location,
null,
inputs);
log.info("Created new parent job record {}", newParentJobRow);

RunArgsRow argsRow =
runArgsDao.upsertRunArgs(
UUID.randomUUID(),
now,
"{}",
Utils.checksumFor(ImmutableMap.of()));
RunRow newRow =
runDao.upsert(
uuid,
null,
facet.getRun().getRunId(),
now,
newParentJobRow.getUuid(),
null,
argsRow.getUuid(),
nominalStartTime,
nominalEndTime,
Optional.ofNullable(event.getEventType())
.map(this::getRunState)
.orElse(null),
now,
namespace.getName(),
newParentJobRow.getName(),
newParentJobRow.getLocation(),
newParentJobRow.getJobContextUuid().orElse(null));
log.info("Created new parent run record {}", newRow);
return newParentJobRow;
});
log.debug("Found parent job record {}", parentJobRow);
return parentJobRow;
} catch (Exception e) {
throw new RuntimeException("Unable to insert parent run", e);
}
});
uuid ->
findParentJobRow(
event,
namespace,
jobContext,
location,
nominalStartTime,
nominalEndTime,
log,
parentRun.get(),
uuid));

// construct the simple name of the job by removing the parent prefix plus the dot '.' separator
String jobName =
Expand Down Expand Up @@ -379,6 +329,129 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
return bag;
}

private JobRow findParentJobRow(
LineageEvent event,
NamespaceRow namespace,
JobContextRow jobContext,
String location,
Instant nominalStartTime,
Instant nominalEndTime,
Logger log,
ParentRunFacet facet,
UUID uuid) {
try {
log.debug("Found parent run event {}", facet);
PGobject inputs = new PGobject();
inputs.setType("json");
inputs.setValue("[]");
JobRow parentJobRow =
createRunDao()
.findJobRowByRunUuid(uuid)
.map(
j -> {
String parentJobName = Utils.parseParentJobName(facet.getJob().getName());
if (j.getNamespaceName().equals(facet.getJob().getNamespace())
&& j.getName().equals(parentJobName)) {
return j;
} else {
// Addresses an Airflow integration bug that generated conflicting run UUIDs
// for DAGs that had the same name, but ran in different namespaces.
UUID parentRunUuid =
Utils.toNameBasedUuid(
facet.getJob().getNamespace(), parentJobName, uuid.toString());
log.warn(
"Parent Run id {} has a different job name '{}.{}' from facet '{}.{}'. "
+ "Assuming Run UUID conflict and generating a new UUID {}",
uuid,
j.getNamespaceName(),
j.getName(),
facet.getJob().getNamespace(),
facet.getJob().getName(),
parentRunUuid);
return createParentJobRunRecord(
event,
namespace,
jobContext,
location,
nominalStartTime,
nominalEndTime,
parentRunUuid,
facet,
inputs);
}
})
.orElseGet(
() ->
createParentJobRunRecord(
event,
namespace,
jobContext,
location,
nominalStartTime,
nominalEndTime,
uuid,
facet,
inputs));
log.debug("Found parent job record {}", parentJobRow);
return parentJobRow;
} catch (Exception e) {
throw new RuntimeException("Unable to insert parent run", e);
}
}

private JobRow createParentJobRunRecord(
LineageEvent event,
NamespaceRow namespace,
JobContextRow jobContext,
String location,
Instant nominalStartTime,
Instant nominalEndTime,
UUID uuid,
ParentRunFacet facet,
PGobject inputs) {
Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant();
Logger log = LoggerFactory.getLogger(OpenLineageDao.class);
JobRow newParentJobRow =
createJobDao()
.upsertJob(
UUID.randomUUID(),
getJobType(event.getJob()),
now,
namespace.getUuid(),
namespace.getName(),
Utils.parseParentJobName(facet.getJob().getName()),
null,
jobContext.getUuid(),
location,
null,
inputs);
log.info("Created new parent job record {}", newParentJobRow);

RunArgsRow argsRow =
createRunArgsDao()
.upsertRunArgs(UUID.randomUUID(), now, "{}", Utils.checksumFor(ImmutableMap.of()));
RunRow newRow =
createRunDao()
.upsert(
uuid,
null,
facet.getRun().getRunId(),
now,
newParentJobRow.getUuid(),
null,
argsRow.getUuid(),
nominalStartTime,
nominalEndTime,
Optional.ofNullable(event.getEventType()).map(this::getRunState).orElse(null),
now,
namespace.getName(),
newParentJobRow.getName(),
newParentJobRow.getLocation(),
newParentJobRow.getJobContextUuid().orElse(null));
log.info("Created new parent run record {}", newRow);
return newParentJobRow;
}

default Set<DatasetId> toDatasetId(List<Dataset> datasets) {
Set<DatasetId> set = new HashSet<>();
if (datasets == null) {
Expand Down
75 changes: 63 additions & 12 deletions api/src/test/java/marquez/OpenLineageIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,12 @@ public void testOpenLineageJobHierarchyAirflowIntegration()
String task2Name = "task2";
String dagName = "the_dag";
RunEvent airflowTask1 =
createAirflowRunEvent(ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName);
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME);

RunEvent airflowTask2 =
createAirflowRunEvent(ol, startOfHour, endOfHour, airflowParentRunId, task2Name, dagName);
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task2Name, dagName, NAMESPACE_NAME);

CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
future.get(5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -237,10 +239,12 @@ public void testOpenLineageJobHierarchyOldAirflowIntegration()
String task2Name = "task2";
String dagName = "the_dag";
RunEvent airflowTask1 =
createAirflowRunEvent(ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName);
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME);

RunEvent airflowTask2 =
createAirflowRunEvent(ol, startOfHour, endOfHour, airflowParentRunId, task2Name, dagName);
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task2Name, dagName, NAMESPACE_NAME);

CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
future.get(5, TimeUnit.SECONDS);
Expand All @@ -259,13 +263,55 @@ public void testOpenLineageJobHierarchyOldAirflowIntegration()
.hasFieldOrPropertyWithValue("parentJobName", null);
List<Run> runsList = client.listRuns(NAMESPACE_NAME, dagName);
assertThat(runsList).isNotEmpty().hasSize(1);
UUID parentRunUuid = Utils.toNameBasedUuid(dagName, airflowParentRunId);
UUID parentRunUuid = Utils.toNameBasedUuid(NAMESPACE_NAME, dagName, airflowParentRunId);
assertThat(runsList.get(0)).hasFieldOrPropertyWithValue("id", parentRunUuid.toString());

List<Run> taskRunsList = client.listRuns(NAMESPACE_NAME, dagName + "." + task1Name);
assertThat(taskRunsList).hasSize(1);
}

@Test
public void testOpenLineageJobHierarchyAirflowIntegrationConflictingRunUuid()
throws ExecutionException, InterruptedException, TimeoutException {
OpenLineage ol = new OpenLineage(URI.create("http://openlineage.test.com/"));
ZonedDateTime startOfHour =
Instant.now()
.atZone(LineageTestUtils.LOCAL_ZONE)
.with(ChronoField.MINUTE_OF_HOUR, 0)
.with(ChronoField.SECOND_OF_MINUTE, 0);
ZonedDateTime endOfHour = startOfHour.plusHours(1);
String airflowParentRunId = UUID.randomUUID().toString();
String task1Name = "task1";
String dagName = "reused_dag_name";

// two dag runs with different namespaces - should result in two distinct jobs
RunEvent airflowTask1 =
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME);

String secondNamespace = "another_namespace";
RunEvent airflowTask2 =
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, secondNamespace);

CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
future.get(5, TimeUnit.SECONDS);

Job job = client.getJob(NAMESPACE_NAME, dagName + "." + task1Name);
assertThat(job)
.isNotNull()
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName + "." + task1Name))
.hasFieldOrPropertyWithValue("parentJobName", dagName);

Job parentJob = client.getJob(secondNamespace, dagName);
assertThat(parentJob)
.isNotNull()
.hasFieldOrPropertyWithValue("id", new JobId(secondNamespace, dagName))
.hasFieldOrPropertyWithValue("parentJobName", null);
List<Run> runsList = client.listRuns(secondNamespace, dagName);
assertThat(runsList).isNotEmpty().hasSize(1);
}

@Test
public void testOpenLineageJobHierarchySparkAndAirflow()
throws ExecutionException, InterruptedException, TimeoutException {
Expand All @@ -281,7 +327,8 @@ public void testOpenLineageJobHierarchySparkAndAirflow()
String sparkTaskName = "theSparkJob";
String dagName = "the_dag";
RunEvent airflowTask1 =
createAirflowRunEvent(ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName);
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME);

RunEvent sparkTask =
createRunEvent(
Expand All @@ -291,7 +338,8 @@ public void testOpenLineageJobHierarchySparkAndAirflow()
airflowTask1.getRun().getRunId().toString(),
sparkTaskName,
dagName + "." + task1Name,
Optional.empty());
Optional.empty(),
NAMESPACE_NAME);

CompletableFuture<Integer> future = sendAllEvents(airflowTask1, sparkTask);
future.get(5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -353,7 +401,8 @@ private RunEvent createAirflowRunEvent(
ZonedDateTime endOfHour,
String airflowParentRunId,
String taskName,
String dagName) {
String dagName,
String namespace) {
RunFacet airflowVersionFacet = ol.newRunFacet();
airflowVersionFacet
.getAdditionalProperties()
Expand All @@ -366,7 +415,8 @@ private RunEvent createAirflowRunEvent(
airflowParentRunId,
taskName,
dagName,
Optional.of(airflowVersionFacet));
Optional.of(airflowVersionFacet),
namespace);
}

@NotNull
Expand All @@ -377,7 +427,8 @@ private RunEvent createRunEvent(
String airflowParentRunId,
String taskName,
String dagName,
Optional<RunFacet> airflowVersionFacet) {
Optional<RunFacet> airflowVersionFacet,
String namespace) {
// The Java SDK requires parent run ids to be a UUID, but the python SDK doesn't. In order to
// emulate requests coming in from older versions of the Airflow library, we log this as just
// a plain old RunFact, but using the "parent" key name. To Marquez, this will look just the
Expand All @@ -390,7 +441,7 @@ private RunEvent createRunEvent(
"run",
ImmutableMap.of("runId", airflowParentRunId),
"job",
ImmutableMap.of("namespace", NAMESPACE_NAME, "name", dagName + "." + taskName)));
ImmutableMap.of("namespace", namespace, "name", dagName + "." + taskName)));
RunFacetsBuilder runFacetBuilder =
ol.newRunFacetsBuilder()
.nominalTime(ol.newNominalTimeRunFacet(startOfHour, endOfHour))
Expand All @@ -402,7 +453,7 @@ private RunEvent createRunEvent(
.run(ol.newRun(UUID.randomUUID(), runFacetBuilder.build()))
.job(
ol.newJob(
NAMESPACE_NAME,
namespace,
dagName + "." + taskName,
ol.newJobFacetsBuilder()
.documentation(ol.newDocumentationJobFacet("the job docs"))
Expand Down
Loading

0 comments on commit 27b54ed

Please sign in to comment.