Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update OpenLineageDao to handle airflow run uuid conflicts #2097

Merged
merged 2 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions api/src/main/java/marquez/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,12 @@ public static UUID toNameBasedUuid(String... nameParts) {
public static UUID findParentRunUuid(ParentRunFacet parent) {
String jobName = parent.getJob().getName();
String parentRunId = parent.getRun().getRunId();
return findParentRunUuid(jobName, parentRunId);
return findParentRunUuid(parent.getJob().getNamespace(), jobName, parentRunId);
}

public static UUID findParentRunUuid(String parentJobName, String parentRunId) {
public static UUID findParentRunUuid(String namespace, String parentJobName, String parentRunId) {
String dagName = parseParentJobName(parentJobName);
return toUuid(parentRunId, dagName);
return toUuid(namespace, parentRunId, dagName);
}

public static String parseParentJobName(String parentJobName) {
Expand All @@ -214,11 +214,11 @@ public static String parseParentJobName(String parentJobName) {
* @param jobName
* @return
*/
public static UUID toUuid(@NotNull String runId, String jobName) {
public static UUID toUuid(@NotNull String namespace, @NotNull String runId, String jobName) {
try {
return UUID.fromString(runId);
} catch (IllegalArgumentException e) {
return Utils.toNameBasedUuid(jobName, runId);
return Utils.toNameBasedUuid(namespace, jobName, runId);
}
}

Expand Down
195 changes: 134 additions & 61 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,67 +160,17 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
Optional<UUID> parentUuid = parentRun.map(Utils::findParentRunUuid);
Optional<JobRow> parentJob =
parentUuid.map(
uuid -> {
try {
ParentRunFacet facet = parentRun.get(); // facet must be present
log.debug("Found parent run event {}", facet);
PGobject inputs = new PGobject();
inputs.setType("json");
inputs.setValue("[]");
JobRow parentJobRow =
runDao
.findJobRowByRunUuid(uuid)
.orElseGet(
() -> {
JobRow newParentJobRow =
jobDao.upsertJob(
UUID.randomUUID(),
getJobType(event.getJob()),
now,
namespace.getUuid(),
namespace.getName(),
Utils.parseParentJobName(facet.getJob().getName()),
null,
jobContext.getUuid(),
location,
null,
inputs);
log.info("Created new parent job record {}", newParentJobRow);

RunArgsRow argsRow =
runArgsDao.upsertRunArgs(
UUID.randomUUID(),
now,
"{}",
Utils.checksumFor(ImmutableMap.of()));
RunRow newRow =
runDao.upsert(
uuid,
null,
facet.getRun().getRunId(),
now,
newParentJobRow.getUuid(),
null,
argsRow.getUuid(),
nominalStartTime,
nominalEndTime,
Optional.ofNullable(event.getEventType())
.map(this::getRunState)
.orElse(null),
now,
namespace.getName(),
newParentJobRow.getName(),
newParentJobRow.getLocation(),
newParentJobRow.getJobContextUuid().orElse(null));
log.info("Created new parent run record {}", newRow);
return newParentJobRow;
});
log.debug("Found parent job record {}", parentJobRow);
return parentJobRow;
} catch (Exception e) {
throw new RuntimeException("Unable to insert parent run", e);
}
});
uuid ->
findParentJobRow(
event,
namespace,
jobContext,
location,
nominalStartTime,
nominalEndTime,
log,
parentRun.get(),
uuid));

// construct the simple name of the job by removing the parent prefix plus the dot '.' separator
String jobName =
Expand Down Expand Up @@ -379,6 +329,129 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
return bag;
}

private JobRow findParentJobRow(
LineageEvent event,
NamespaceRow namespace,
JobContextRow jobContext,
String location,
Instant nominalStartTime,
Instant nominalEndTime,
Logger log,
ParentRunFacet facet,
UUID uuid) {
try {
log.debug("Found parent run event {}", facet);
PGobject inputs = new PGobject();
inputs.setType("json");
inputs.setValue("[]");
JobRow parentJobRow =
createRunDao()
.findJobRowByRunUuid(uuid)
.map(
j -> {
String parentJobName = Utils.parseParentJobName(facet.getJob().getName());
if (j.getNamespaceName().equals(facet.getJob().getNamespace())
&& j.getName().equals(parentJobName)) {
return j;
} else {
// Addresses an Airflow integration bug that generated conflicting run UUIDs
// for DAGs that had the same name, but ran in different namespaces.
UUID parentRunUuid =
Utils.toNameBasedUuid(
facet.getJob().getNamespace(), parentJobName, uuid.toString());
log.warn(
"Parent Run id {} has a different job name '{}.{}' from facet '{}.{}'. "
+ "Assuming Run UUID conflict and generating a new UUID {}",
uuid,
j.getNamespaceName(),
j.getName(),
facet.getJob().getNamespace(),
facet.getJob().getName(),
parentRunUuid);
return createParentJobRunRecord(
event,
namespace,
jobContext,
location,
nominalStartTime,
nominalEndTime,
parentRunUuid,
facet,
inputs);
}
})
.orElseGet(
() ->
createParentJobRunRecord(
event,
namespace,
jobContext,
location,
nominalStartTime,
nominalEndTime,
uuid,
facet,
inputs));
log.debug("Found parent job record {}", parentJobRow);
return parentJobRow;
} catch (Exception e) {
throw new RuntimeException("Unable to insert parent run", e);
}
}

private JobRow createParentJobRunRecord(
LineageEvent event,
NamespaceRow namespace,
JobContextRow jobContext,
String location,
Instant nominalStartTime,
Instant nominalEndTime,
UUID uuid,
ParentRunFacet facet,
PGobject inputs) {
Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant();
Logger log = LoggerFactory.getLogger(OpenLineageDao.class);
JobRow newParentJobRow =
createJobDao()
.upsertJob(
UUID.randomUUID(),
getJobType(event.getJob()),
now,
namespace.getUuid(),
namespace.getName(),
Utils.parseParentJobName(facet.getJob().getName()),
null,
jobContext.getUuid(),
location,
null,
inputs);
log.info("Created new parent job record {}", newParentJobRow);

RunArgsRow argsRow =
createRunArgsDao()
.upsertRunArgs(UUID.randomUUID(), now, "{}", Utils.checksumFor(ImmutableMap.of()));
RunRow newRow =
createRunDao()
.upsert(
uuid,
null,
facet.getRun().getRunId(),
now,
newParentJobRow.getUuid(),
null,
argsRow.getUuid(),
nominalStartTime,
nominalEndTime,
Optional.ofNullable(event.getEventType()).map(this::getRunState).orElse(null),
now,
namespace.getName(),
newParentJobRow.getName(),
newParentJobRow.getLocation(),
newParentJobRow.getJobContextUuid().orElse(null));
log.info("Created new parent run record {}", newRow);
return newParentJobRow;
}

default Set<DatasetId> toDatasetId(List<Dataset> datasets) {
Set<DatasetId> set = new HashSet<>();
if (datasets == null) {
Expand Down
75 changes: 63 additions & 12 deletions api/src/test/java/marquez/OpenLineageIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,12 @@ public void testOpenLineageJobHierarchyAirflowIntegration()
String task2Name = "task2";
String dagName = "the_dag";
RunEvent airflowTask1 =
createAirflowRunEvent(ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName);
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME);

RunEvent airflowTask2 =
createAirflowRunEvent(ol, startOfHour, endOfHour, airflowParentRunId, task2Name, dagName);
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task2Name, dagName, NAMESPACE_NAME);

CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
future.get(5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -237,10 +239,12 @@ public void testOpenLineageJobHierarchyOldAirflowIntegration()
String task2Name = "task2";
String dagName = "the_dag";
RunEvent airflowTask1 =
createAirflowRunEvent(ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName);
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME);

RunEvent airflowTask2 =
createAirflowRunEvent(ol, startOfHour, endOfHour, airflowParentRunId, task2Name, dagName);
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task2Name, dagName, NAMESPACE_NAME);

CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
future.get(5, TimeUnit.SECONDS);
Expand All @@ -259,13 +263,55 @@ public void testOpenLineageJobHierarchyOldAirflowIntegration()
.hasFieldOrPropertyWithValue("parentJobName", null);
List<Run> runsList = client.listRuns(NAMESPACE_NAME, dagName);
assertThat(runsList).isNotEmpty().hasSize(1);
UUID parentRunUuid = Utils.toNameBasedUuid(dagName, airflowParentRunId);
UUID parentRunUuid = Utils.toNameBasedUuid(NAMESPACE_NAME, dagName, airflowParentRunId);
assertThat(runsList.get(0)).hasFieldOrPropertyWithValue("id", parentRunUuid.toString());

List<Run> taskRunsList = client.listRuns(NAMESPACE_NAME, dagName + "." + task1Name);
assertThat(taskRunsList).hasSize(1);
}

@Test
public void testOpenLineageJobHierarchyAirflowIntegrationConflictingRunUuid()
throws ExecutionException, InterruptedException, TimeoutException {
OpenLineage ol = new OpenLineage(URI.create("http://openlineage.test.com/"));
ZonedDateTime startOfHour =
Instant.now()
.atZone(LineageTestUtils.LOCAL_ZONE)
.with(ChronoField.MINUTE_OF_HOUR, 0)
.with(ChronoField.SECOND_OF_MINUTE, 0);
ZonedDateTime endOfHour = startOfHour.plusHours(1);
String airflowParentRunId = UUID.randomUUID().toString();
String task1Name = "task1";
String dagName = "reused_dag_name";

// two dag runs with different namespaces - should result in two distinct jobs
RunEvent airflowTask1 =
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME);

String secondNamespace = "another_namespace";
RunEvent airflowTask2 =
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, secondNamespace);

CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
future.get(5, TimeUnit.SECONDS);

Job job = client.getJob(NAMESPACE_NAME, dagName + "." + task1Name);
assertThat(job)
.isNotNull()
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName + "." + task1Name))
.hasFieldOrPropertyWithValue("parentJobName", dagName);

Job parentJob = client.getJob(secondNamespace, dagName);
assertThat(parentJob)
.isNotNull()
.hasFieldOrPropertyWithValue("id", new JobId(secondNamespace, dagName))
.hasFieldOrPropertyWithValue("parentJobName", null);
List<Run> runsList = client.listRuns(secondNamespace, dagName);
assertThat(runsList).isNotEmpty().hasSize(1);
}

@Test
public void testOpenLineageJobHierarchySparkAndAirflow()
throws ExecutionException, InterruptedException, TimeoutException {
Expand All @@ -281,7 +327,8 @@ public void testOpenLineageJobHierarchySparkAndAirflow()
String sparkTaskName = "theSparkJob";
String dagName = "the_dag";
RunEvent airflowTask1 =
createAirflowRunEvent(ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName);
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME);

RunEvent sparkTask =
createRunEvent(
Expand All @@ -291,7 +338,8 @@ public void testOpenLineageJobHierarchySparkAndAirflow()
airflowTask1.getRun().getRunId().toString(),
sparkTaskName,
dagName + "." + task1Name,
Optional.empty());
Optional.empty(),
NAMESPACE_NAME);

CompletableFuture<Integer> future = sendAllEvents(airflowTask1, sparkTask);
future.get(5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -353,7 +401,8 @@ private RunEvent createAirflowRunEvent(
ZonedDateTime endOfHour,
String airflowParentRunId,
String taskName,
String dagName) {
String dagName,
String namespace) {
RunFacet airflowVersionFacet = ol.newRunFacet();
airflowVersionFacet
.getAdditionalProperties()
Expand All @@ -366,7 +415,8 @@ private RunEvent createAirflowRunEvent(
airflowParentRunId,
taskName,
dagName,
Optional.of(airflowVersionFacet));
Optional.of(airflowVersionFacet),
namespace);
}

@NotNull
Expand All @@ -377,7 +427,8 @@ private RunEvent createRunEvent(
String airflowParentRunId,
String taskName,
String dagName,
Optional<RunFacet> airflowVersionFacet) {
Optional<RunFacet> airflowVersionFacet,
String namespace) {
// The Java SDK requires parent run ids to be a UUID, but the python SDK doesn't. In order to
// emulate requests coming in from older versions of the Airflow library, we log this as just
// a plain old RunFact, but using the "parent" key name. To Marquez, this will look just the
Expand All @@ -390,7 +441,7 @@ private RunEvent createRunEvent(
"run",
ImmutableMap.of("runId", airflowParentRunId),
"job",
ImmutableMap.of("namespace", NAMESPACE_NAME, "name", dagName + "." + taskName)));
ImmutableMap.of("namespace", namespace, "name", dagName + "." + taskName)));
RunFacetsBuilder runFacetBuilder =
ol.newRunFacetsBuilder()
.nominalTime(ol.newNominalTimeRunFacet(startOfHour, endOfHour))
Expand All @@ -402,7 +453,7 @@ private RunEvent createRunEvent(
.run(ol.newRun(UUID.randomUUID(), runFacetBuilder.build()))
.job(
ol.newJob(
NAMESPACE_NAME,
namespace,
dagName + "." + taskName,
ol.newJobFacetsBuilder()
.documentation(ol.newDocumentationJobFacet("the job docs"))
Expand Down
Loading