Skip to content

Commit

Permalink
Fix/symlinked jobs in queries (#2053)
Browse files Browse the repository at this point in the history
* Update jobs update function to dedupe aliases

Signed-off-by: Michael Collado <collado.mike@gmail.com>

* Update search query to accommodate symlinked jobs and aliases

Signed-off-by: Michael Collado <collado.mike@gmail.com>

* Update lineage query to include symlinked jobs in lineage

Signed-off-by: Michael Collado <collado.mike@gmail.com>

* Updated search test to validate symlink target jobs are returned

Signed-off-by: Michael Collado <collado.mike@gmail.com>
  • Loading branch information
collado-mike authored Aug 1, 2022
1 parent 81972b0 commit 90feb2e
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 10 deletions.
13 changes: 8 additions & 5 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ public interface LineageDao {
+ " SELECT j.uuid AS job_uuid,\n"
+ " ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,\n"
+ " ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs\n"
+ " FROM jobs j\n"
+ " LEFT JOIN job_versions v on j.current_version_uuid = v.uuid\n"
+ " FROM jobs_view j\n"
+ " LEFT JOIN jobs_view s ON s.symlink_target_uuid=j.uuid\n"
+ " LEFT JOIN job_versions v on COALESCE(j.current_version_uuid, s.current_version_uuid) = v.uuid\n"
+ " LEFT JOIN job_versions_io_mapping io on v.uuid = io.job_version_uuid\n"
+ " GROUP BY j.uuid\n"
+ " ),\n"
Expand All @@ -60,9 +61,10 @@ public interface LineageDao {
+ " array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)\n"
+ " AND depth < :depth"
+ " )\n"
+ "SELECT DISTINCT ON (l2.job_uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context\n"
+ "SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context\n"
+ "FROM lineage l2\n"
+ "INNER JOIN jobs_view j ON j.uuid=l2.job_uuid\n"
+ "INNER JOIN jobs_view s ON s.uuid=l2.job_uuid\n"
+ "INNER JOIN jobs_view j ON j.uuid=COALESCE(s.symlink_target_uuid, s.uuid)\n"
+ "LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid")
Set<JobData> getLineage(@BindList Set<UUID> jobIds, int depth);

Expand All @@ -88,7 +90,8 @@ public interface LineageDao {
+ " SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version\n"
+ " FROM runs_view r\n"
+ " INNER JOIN job_versions jv ON jv.uuid=r.job_version_uuid\n"
+ " WHERE jv.job_uuid in (<jobUuid>)\n"
+ " INNER JOIN jobs_view j ON j.uuid=jv.job_uuid\n"
+ " WHERE j.uuid in (<jobUuid>) OR j.symlink_target_uuid IN (<jobUuid>)\n"
+ " ORDER BY r.job_name, r.namespace_name, created_at DESC\n"
+ ")\n"
+ "SELECT r.*, ra.args, ctx.context, f.facets,\n"
Expand Down
8 changes: 6 additions & 2 deletions api/src/main/java/marquez/db/SearchDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ public interface SearchDao {
+ " FROM datasets AS d\n"
+ " WHERE d.name ilike '%' || :query || '%'\n"
+ " UNION\n"
+ " SELECT 'JOB' AS type, j.name, j.updated_at, j.namespace_name\n"
+ " FROM jobs_view AS j\n"
+ " SELECT DISTINCT ON (j.namespace_name, j.name) \n"
+ " 'JOB' AS type, j.name, j.updated_at, j.namespace_name\n"
+ " FROM (SELECT namespace_name, name, unnest(aliases) AS alias, updated_at \n"
+ " FROM jobs_view WHERE symlink_target_uuid IS NULL\n"
+ " ORDER BY updated_at DESC) AS j\n"
+ " WHERE j.name ilike '%' || :query || '%'\n"
+ " OR j.alias ilike '%' || :query || '%'\n"
+ ") AS results\n"
+ "WHERE type = :filter OR CAST(:filter AS TEXT) IS NULL\n"
+ "ORDER BY :sort\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ BEGIN
INNER JOIN fqn jf ON jf.uuid = COALESCE(js.link_target_uuid, j.uuid)
ON CONFLICT (uuid) DO UPDATE
SET job_fqn=EXCLUDED.job_fqn,
aliases = jobs_fqn.aliases || EXCLUDED.aliases;
aliases = (SELECT array_agg(DISTINCT a) FROM (SELECT unnest(jobs_fqn.aliases) AS a UNION SELECT unnest(EXCLUDED.aliases) AS a) al);
END IF;
SELECT * INTO inserted_job FROM jobs_view WHERE uuid=job_uuid;
return inserted_job;
Expand Down
101 changes: 101 additions & 0 deletions api/src/test/java/marquez/db/LineageDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.google.common.base.Functions;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -25,10 +26,13 @@
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import marquez.common.models.JobType;
import marquez.db.LineageTestUtils.DatasetConsumerJob;
import marquez.db.LineageTestUtils.JobLineage;
import marquez.db.models.DatasetData;
import marquez.db.models.JobData;
import marquez.db.models.JobRow;
import marquez.db.models.NamespaceRow;
import marquez.db.models.UpdateLineageRow;
import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
import marquez.service.models.LineageEvent;
Expand All @@ -44,6 +48,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.postgresql.util.PGobject;

@ExtendWith(MarquezJdbiExternalPostgresExtension.class)
public class LineageDaoTest {
Expand Down Expand Up @@ -177,6 +182,102 @@ public void testGetLineage() {
}
}

@Test
public void testGetLineageForSymlinkedJob() throws SQLException {

UpdateLineageRow writeJob =
LineageTestUtils.createLineageRow(
openLineageDao,
"writeJob",
"COMPLETE",
jobFacet,
Arrays.asList(),
Arrays.asList(dataset));
List<JobLineage> jobRows =
writeDownstreamLineage(
openLineageDao,
new LinkedList<>(
Arrays.asList(
new DatasetConsumerJob("readJob", 20, Optional.of("outputData")),
new DatasetConsumerJob("downstreamJob", 1, Optional.empty()))),
jobFacet,
dataset);

NamespaceRow namespaceRow =
jdbi.onDemand(NamespaceDao.class)
.findNamespaceByName(writeJob.getJob().getNamespaceName())
.get();

PGobject inputs = new PGobject();
inputs.setType("json");
inputs.setValue("[]");

String symlinkTargetJobName = "A_new_write_job";
JobRow targetJob =
jdbi.onDemand(JobDao.class)
.upsertJob(
UUID.randomUUID(),
JobType.valueOf(writeJob.getJob().getType()),
writeJob.getJob().getCreatedAt(),
namespaceRow.getUuid(),
writeJob.getJob().getNamespaceName(),
symlinkTargetJobName,
writeJob.getJob().getDescription().orElse(null),
writeJob.getJob().getJobContextUuid().orElse(null),
writeJob.getJob().getLocation(),
null,
inputs);
jdbi.onDemand(JobDao.class)
.upsertJob(
writeJob.getJob().getUuid(),
JobType.valueOf(writeJob.getJob().getType()),
writeJob.getJob().getCreatedAt(),
namespaceRow.getUuid(),
writeJob.getJob().getNamespaceName(),
writeJob.getJob().getName(),
writeJob.getJob().getDescription().orElse(null),
writeJob.getJob().getJobContextUuid().orElse(null),
writeJob.getJob().getLocation(),
targetJob.getUuid(),
inputs);

// fetch the first "targetJob" lineage.
Set<JobData> connectedJobs =
lineageDao.getLineage(new HashSet<>(Arrays.asList(targetJob.getUuid())), 2);

// 20 readJobs + 1 downstreamJob for each (20) + 1 write job = 41
assertThat(connectedJobs).size().isEqualTo(41);

Set<UUID> jobIds = connectedJobs.stream().map(JobData::getUuid).collect(Collectors.toSet());
// expect the job that wrote "commonDataset", which is readJob0's input
assertThat(jobIds).contains(targetJob.getUuid());

// expect all downstream jobs
Set<UUID> readJobUUIDs =
jobRows.stream()
.flatMap(row -> Stream.concat(Stream.of(row), row.getDownstreamJobs().stream()))
.map(JobLineage::getId)
.collect(Collectors.toSet());
assertThat(jobIds).containsAll(readJobUUIDs);

Map<UUID, JobData> actualJobRows =
connectedJobs.stream().collect(Collectors.toMap(JobData::getUuid, Functions.identity()));
for (JobLineage expected : jobRows) {
JobData job = actualJobRows.get(expected.getId());
assertThat(job.getInputUuids())
.containsAll(
expected.getInput().map(ds -> ds.getDatasetRow().getUuid()).stream()::iterator);
assertThat(job.getOutputUuids())
.containsAll(
expected.getOutput().map(ds -> ds.getDatasetRow().getUuid()).stream()::iterator);
}
Set<UUID> lineageForOriginalJob =
lineageDao.getLineage(new HashSet<>(Arrays.asList(writeJob.getJob().getUuid())), 2).stream()
.map(JobData::getUuid)
.collect(Collectors.toSet());
assertThat(lineageForOriginalJob).isEqualTo(jobIds);
}

@Test
public void testGetLineageWithJobThatHasNoDownstreamConsumers() {

Expand Down
67 changes: 65 additions & 2 deletions api/src/test/java/marquez/db/SearchDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

import static org.assertj.core.api.Assertions.assertThat;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.net.URL;
import java.sql.SQLException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
Expand All @@ -15,17 +19,24 @@
import marquez.api.models.SearchFilter;
import marquez.api.models.SearchResult;
import marquez.api.models.SearchSort;
import marquez.common.Utils;
import marquez.common.models.JobType;
import marquez.db.models.JobRow;
import marquez.db.models.NamespaceRow;
import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
import marquez.service.models.JobMeta;
import org.jdbi.v3.core.Jdbi;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.postgresql.util.PGobject;

/** The test suite for {@link SearchDao}. */
@Tag("DataAccessTests")
@ExtendWith(MarquezJdbiExternalPostgresExtension.class)
public class SearchDaoTest {

static final int LIMIT = 25;
static final int NUM_OF_JOBS = 2;
/**
Expand All @@ -34,10 +45,12 @@ public class SearchDaoTest {
*/
static final int NUM_OF_DATASETS = 12;

public static final String NEW_SYMLINK_TARGET_JOB = "a_new_symlink_target_job";

static SearchDao searchDao;

@BeforeAll
public static void setUpOnce(final Jdbi jdbi) {
public static void setUpOnce(final Jdbi jdbi) throws SQLException {
searchDao = jdbi.onDemand(SearchDao.class);

DbTestUtils.newDataset(jdbi, "name_ordering_0");
Expand All @@ -48,7 +61,51 @@ public static void setUpOnce(final Jdbi jdbi) {
DbTestUtils.newDataset(jdbi, "time_ordering_1");
DbTestUtils.newDataset(jdbi, "time_ordering_2");

DbTestUtils.newJobs(jdbi, NUM_OF_JOBS);
ImmutableSet<JobRow> jobRows = DbTestUtils.newJobs(jdbi, NUM_OF_JOBS);

// add a symlinked job - validate that the number of results is the same in the below unit test
jobRows.stream()
.findAny()
.ifPresent(
j -> {
try {
NamespaceRow namespaceRow =
jdbi.onDemand(NamespaceDao.class)
.findNamespaceByName(j.getNamespaceName())
.get();
JobRow symlinkTargetJob =
DbTestUtils.newJobWith(
jdbi,
namespaceRow.getName(),
NEW_SYMLINK_TARGET_JOB,
new JobMeta(
JobType.valueOf(j.getType()),
ImmutableSet.copyOf(j.getInputs()),
ImmutableSet.of(),
new URL(j.getLocation()),
ImmutableMap.of(),
j.getDescription().orElse(null),
null));
PGobject inputs = new PGobject();
inputs.setType("json");
inputs.setValue(Utils.getMapper().writeValueAsString(j.getInputs()));
jdbi.onDemand(JobDao.class)
.upsertJob(
j.getUuid(),
JobType.valueOf(j.getType()),
j.getCreatedAt(),
namespaceRow.getUuid(),
namespaceRow.getName(),
j.getName(),
j.getDescription().orElse(null),
j.getJobContextUuid().orElse(null),
j.getLocation(),
symlinkTargetJob.getUuid(),
inputs);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

@Test
Expand All @@ -72,6 +129,12 @@ public void testSearch() {
final List<SearchResult> resultsWithOnlyJobs =
resultsGroupedByType.get(SearchResult.ResultType.JOB);
assertThat(resultsWithOnlyJobs).hasSize(NUM_OF_JOBS);

// Even though we searched for "test" and the symlink target doesn't have "test" in its name,
// it is part of the search results because the original job had "test" in its name.
assertThat(resultsWithOnlyJobs)
.filteredOn(j -> j.getName().equals(NEW_SYMLINK_TARGET_JOB))
.isNotEmpty();
}

@Test
Expand Down

0 comments on commit 90feb2e

Please sign in to comment.