Skip to content

Commit

Permalink
implement inputFacets & outputFacets (MarquezProject#2417)
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
Signed-off-by: Xavier-Cliquennois <xavier.cliquennois@wearegraphite.io>
  • Loading branch information
pawel-big-lebowski authored and Xavier-Cliquennois committed Jul 26, 2023
1 parent 0c83acd commit df25ec7
Show file tree
Hide file tree
Showing 25 changed files with 781 additions and 100 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.32.0...HEAD)

### Added

* Support `inputFacets` and `outputFacets` from Openlineage specificatio [`#2417`](https://github.com/MarquezProject/marquez/pull/2417) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*Adds the ability to store `inputFacets` / `outputFacets` which are sent within datasets.*
*Expose them through Marquez API as a member of `Run` resource.*

## [0.32.0](https://github.com/MarquezProject/marquez/compare/0.31.0...0.32.0) - 2023-03-20

### Fixed
Expand Down
34 changes: 34 additions & 0 deletions api/src/main/java/marquez/common/models/InputDatasetVersion.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.common.models;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;

/**
* Class used to store dataset version and `inputFacets` which are assigned to datasets within
* OpenLineage spec, but are exposed within Marquez api as a part of {@link
* marquez.service.models.Run}
*/
@EqualsAndHashCode
@ToString
@Getter
public class InputDatasetVersion {

private final DatasetVersionId datasetVersionId;
private final ImmutableMap<String, Object> facets;

public InputDatasetVersion(
@JsonProperty("datasetVersionId") @NonNull DatasetVersionId datasetVersionId,
@JsonProperty("facets") @NonNull ImmutableMap<String, Object> facets) {
this.datasetVersionId = datasetVersionId;
this.facets = facets;
}
}
34 changes: 34 additions & 0 deletions api/src/main/java/marquez/common/models/OutputDatasetVersion.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.common.models;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;

/**
* Class used to store dataset version and `outputFacets` which are assigned to datasets within
* OpenLineage spec, but are exposed within Marquez api as a part of {@link
* marquez.service.models.Run}
*/
@EqualsAndHashCode
@ToString
@Getter
public class OutputDatasetVersion {

private final DatasetVersionId datasetVersionId;
private final ImmutableMap<String, Object> facets;

public OutputDatasetVersion(
@JsonProperty("datasetVersionId") @NonNull DatasetVersionId datasetVersionId,
@JsonProperty("facets") @NonNull ImmutableMap<String, Object> facets) {
this.datasetVersionId = datasetVersionId;
this.facets = facets;
}
}
1 change: 1 addition & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private Columns() {}
public static final String NAMESPACE_NAME = "namespace_name";
public static final String DATASET_NAME = "dataset_name";
public static final String FACETS = "facets";
public static final String DATASET_FACETS = "dataset_facets";
public static final String TAGS = "tags";
public static final String IS_HIDDEN = "is_hidden";

Expand Down
4 changes: 2 additions & 2 deletions api/src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ LEFT JOIN (
df.dataset_version_uuid,
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
FROM dataset_facets_view AS df
WHERE df.facet IS NOT NULL
WHERE df.facet IS NOT NULL AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown')
GROUP BY df.dataset_version_uuid
) f ON f.dataset_version_uuid = d.current_version_uuid
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
Expand Down Expand Up @@ -134,7 +134,7 @@ LEFT JOIN (
df.dataset_version_uuid,
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
FROM dataset_facets_view AS df
WHERE df.facet IS NOT NULL
WHERE df.facet IS NOT NULL AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown')
GROUP BY df.dataset_version_uuid
) f ON f.dataset_version_uuid = d.current_version_uuid
WHERE d.namespace_name = :namespaceName
Expand Down
52 changes: 52 additions & 0 deletions api/src/main/java/marquez/db/DatasetFacetsDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,58 @@ default void insertDatasetFacetsFor(
FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName))));
}

default void insertInputDatasetFacetsFor(
@NonNull UUID datasetUuid,
@NonNull UUID datasetVersionUuid,
@NonNull UUID runUuid,
@NonNull Instant lineageEventTime,
@NonNull String lineageEventType,
@NonNull LineageEvent.InputDatasetFacets inputFacets) {
final Instant now = Instant.now();

JsonNode jsonNode = Utils.getMapper().valueToTree(inputFacets);
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false)
.forEach(
fieldName ->
insertDatasetFacet(
now,
datasetUuid,
datasetVersionUuid,
runUuid,
lineageEventTime,
lineageEventType,
Type.INPUT,
fieldName,
FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName))));
}

default void insertOutputDatasetFacetsFor(
@NonNull UUID datasetUuid,
@NonNull UUID datasetVersionUuid,
@NonNull UUID runUuid,
@NonNull Instant lineageEventTime,
@NonNull String lineageEventType,
@NonNull LineageEvent.OutputDatasetFacets outputFacets) {
final Instant now = Instant.now();

JsonNode jsonNode = Utils.getMapper().valueToTree(outputFacets);
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false)
.forEach(
fieldName ->
insertDatasetFacet(
now,
datasetUuid,
datasetVersionUuid,
runUuid,
lineageEventTime,
lineageEventType,
Type.OUTPUT,
fieldName,
FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName))));
}

record DatasetFacetRow(
Instant createdAt,
UUID datasetUuid,
Expand Down
49 changes: 32 additions & 17 deletions api/src/main/java/marquez/db/DatasetVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,19 @@ default void updateDatasetVersionMetric(

@SqlQuery(
"""
WITH selected_dataset_versions AS (
SELECT dv.*
FROM dataset_versions dv
WHERE dv.version = :version
), selected_dataset_version_facets AS (
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet
FROM selected_dataset_versions dv
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid
)
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
t.tags, f.facets
FROM dataset_versions dv
FROM selected_dataset_versions dv
LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
Expand All @@ -169,21 +178,28 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = dv.dataset_uuid
LEFT JOIN (
SELECT dvf.dataset_version_uuid,
JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM dataset_facets_view dvf
GROUP BY dataset_version_uuid
) f ON f.dataset_version_uuid = dv.uuid
WHERE dv.version = :version
""")
SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM selected_dataset_version_facets dvf
WHERE dvf.run_uuid = dvf.run_uuid
GROUP BY dvf.uuid
) f ON f.dataset_uuid = dv.uuid""")
Optional<DatasetVersion> findBy(UUID version);

@SqlQuery(
"""
WITH selected_dataset_versions AS (
SELECT dv.*
FROM dataset_versions dv
WHERE dv.uuid = :uuid
), selected_dataset_version_facets AS (
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet
FROM selected_dataset_versions dv
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown')
)
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
t.tags, f.facets
FROM dataset_versions dv
FROM selected_dataset_versions dv
LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
Expand All @@ -192,14 +208,12 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = dv.dataset_uuid
LEFT JOIN (
SELECT dvf.dataset_version_uuid,
JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM dataset_facets_view dvf
GROUP BY dataset_version_uuid
) f ON f.dataset_version_uuid = dv.uuid
WHERE dv.uuid = :uuid
""")
LEFT JOIN (
SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM selected_dataset_version_facets dvf
WHERE dvf.run_uuid = dvf.run_uuid
GROUP BY dvf.uuid
) f ON f.dataset_uuid = dv.uuid""")
Optional<DatasetVersion> findByUuid(UUID uuid);

default Optional<DatasetVersion> findByWithRun(UUID version) {
Expand Down Expand Up @@ -246,6 +260,7 @@ LEFT JOIN (
SELECT dvf.dataset_version_uuid,
JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM dataset_facets_view dvf
WHERE (type ILIKE 'dataset' OR type ILIKE 'unknown')
GROUP BY dataset_version_uuid
) f ON f.dataset_version_uuid = dv.uuid
WHERE dv.namespace_name = :namespaceName
Expand Down
24 changes: 24 additions & 0 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,18 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
now,
event.getEventType(),
facets));

// InputFacets ...
Optional.ofNullable(dataset.getInputFacets())
.ifPresent(
facets ->
datasetFacetsDao.insertInputDatasetFacetsFor(
record.getDatasetRow().getUuid(),
record.getDatasetVersionRow().getUuid(),
runUuid,
now,
event.getEventType(),
facets));
}
}
bag.setInputs(Optional.ofNullable(datasetInputs));
Expand Down Expand Up @@ -314,6 +326,18 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
now,
event.getEventType(),
facets));

// OutputFacets ...
Optional.ofNullable(dataset.getOutputFacets())
.ifPresent(
facets ->
datasetFacetsDao.insertOutputDatasetFacetsFor(
record.getDatasetRow().getUuid(),
record.getDatasetVersionRow().getUuid(),
runUuid,
now,
event.getEventType(),
facets));
}
}

Expand Down
Loading

0 comments on commit df25ec7

Please sign in to comment.