Skip to content

Commit

Permalink
Fill run state of the parent run when it is created by child run (#2296)
Browse files Browse the repository at this point in the history
Signed-off-by: Minkyu Park <minkyu.park.200@gmail.com>

Signed-off-by: Minkyu Park <minkyu.park.200@gmail.com>
Co-authored-by: Willy Lulciuc <willy@datakin.com>
  • Loading branch information
fm100 and wslulciuc committed Dec 8, 2022
1 parent ff94d56 commit 1bbcb6f
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 19 deletions.
49 changes: 32 additions & 17 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -498,25 +498,40 @@ private JobRow createParentJobRunRecord(
RunArgsRow argsRow =
createRunArgsDao()
.upsertRunArgs(UUID.randomUUID(), now, "{}", Utils.checksumFor(ImmutableMap.of()));

Optional<RunState> runState = Optional.ofNullable(event.getEventType()).map(this::getRunState);
RunDao runDao = createRunDao();
RunRow newRow =
createRunDao()
.upsert(
uuid,
null,
facet.getRun().getRunId(),
now,
newParentJobRow.getUuid(),
null,
argsRow.getUuid(),
nominalStartTime,
nominalEndTime,
Optional.ofNullable(event.getEventType()).map(this::getRunState).orElse(null),
now,
namespace.getName(),
newParentJobRow.getName(),
newParentJobRow.getLocation(),
newParentJobRow.getJobContextUuid().orElse(null));
runDao.upsert(
uuid,
null,
facet.getRun().getRunId(),
now,
newParentJobRow.getUuid(),
null,
argsRow.getUuid(),
nominalStartTime,
nominalEndTime,
runState.orElse(null),
now,
namespace.getName(),
newParentJobRow.getName(),
newParentJobRow.getLocation(),
newParentJobRow.getJobContextUuid().orElse(null));
log.info("Created new parent run record {}", newRow);

runState
.map(rs -> createRunStateDao().upsert(UUID.randomUUID(), now, uuid, rs))
.ifPresent(
runStateRow -> {
UUID runStateUuid = runStateRow.getUuid();
if (RunState.valueOf(runStateRow.getState()).isDone()) {
runDao.updateEndState(uuid, now, runStateUuid);
} else {
runDao.updateStartState(uuid, now, runStateUuid);
}
});

return newParentJobRow;
}

Expand Down
12 changes: 10 additions & 2 deletions api/src/test/java/marquez/OpenLineageIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static marquez.db.LineageTestUtils.PRODUCER_URL;
import static marquez.db.LineageTestUtils.SCHEMA_URL;
import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

Expand Down Expand Up @@ -52,6 +53,7 @@
import marquez.client.models.Run;
import marquez.common.Utils;
import marquez.db.LineageTestUtils;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.jdbi.v3.core.Jdbi;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -170,7 +172,7 @@ public void testSendOpenLineageEventFailsJsonProcessing() throws IOException {
}

@Test
public void testGetLineageForNonExistantDataset() {
public void testGetLineageForNonExistentDataset() {
CompletableFuture<Integer> response =
this.fetchLineage("dataset:Imadethisup:andthistoo")
.thenApply(HttpResponse::statusCode)
Expand Down Expand Up @@ -419,7 +421,13 @@ public void testOpenLineageJobHierarchyAirflowIntegrationWithParentOnStartEventO
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName))
.hasFieldOrPropertyWithValue("parentJobName", null);
List<Run> runsList = client.listRuns(NAMESPACE_NAME, dagName);
assertThat(runsList).isNotEmpty().hasSize(1);
assertThat(runsList)
.isNotEmpty()
.hasSize(1)
.first()
.extracting("startedAt", as(InstanceOfAssertFactories.OPTIONAL))
.get()
.isNotNull();
}

@Test
Expand Down

0 comments on commit 1bbcb6f

Please sign in to comment.