Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OL facets - PR2 - read facets from views based on lineage_events table #2355

Merged
merged 4 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ jobs:
working_directory: ~/marquez
machine:
image: ubuntu-2004:current
resource_class: large
steps:
- checkout
- run: ./.circleci/get-docker-compose.sh
Expand Down
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@

### Added

* Split `lineage_events` table to `dataset_facets`, `run_facets`, and `job_facets` tables. [`2350`](https://github.com/MarquezProject/marquez/pull/2350), [`2355`](https://github.com/MarquezProject/marquez/pull/2355), [`2359`](https://github.com/MarquezProject/marquez/pull/2359)
[@wslulciuc](https://github.com/wslulciuc,), [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
* Performance improvement storing and querying facets.
* Migration procedure requires manual steps if database has more than 100K lineage events.
* We highly encourage users to review our [migration plan](https://github.com/MarquezProject/marquez/blob/main/api/src/main/resources/marquez/db/migration/V57__readme.md).
* Additions to seed data for column-lineage [`#2381`](https://github.com/MarquezProject/marquez/pull/2381) [@rossturk](https://github.com/rossturk)
* Added new `docker/down.sh` script that makes it easier to stop local deployment when run detached [`#2380`](https://github.com/MarquezProject/marquez/pull/2380) [@rossturk](https://github.com/rossturk)


## [0.29.0](https://github.com/MarquezProject/marquez/compare/0.28.0...0.29.0) - 2022-12-19

### Added
Expand Down
7 changes: 7 additions & 0 deletions api/src/main/java/marquez/MarquezApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.api.filter.JobRedirectFilter;
import marquez.cli.DbMigrationCommand;
import marquez.cli.MetadataCommand;
import marquez.cli.SeedCommand;
import marquez.common.Utils;
Expand Down Expand Up @@ -149,6 +150,12 @@ public void registerResources(
}
}

@Override
protected void addDefaultCommands(Bootstrap<MarquezConfig> bootstrap) {
bootstrap.addCommand(new DbMigrationCommand<>(this));
super.addDefaultCommands(bootstrap);
}

private MarquezContext buildMarquezContext(
MarquezConfig config, Environment env, ManagedDataSource source) {
final JdbiFactory factory = new JdbiFactory();
Expand Down
113 changes: 113 additions & 0 deletions api/src/main/java/marquez/cli/DbMigrationCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.cli;

import io.dropwizard.Application;
import io.dropwizard.cli.EnvironmentCommand;
import io.dropwizard.db.DataSourceFactory;
import io.dropwizard.db.ManagedDataSource;
import io.dropwizard.jdbi3.JdbiFactory;
import io.dropwizard.setup.Environment;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import marquez.db.migrations.V57_1__BackfillFacets;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.jackson2.Jackson2Plugin;
import org.jdbi.v3.postgres.PostgresPlugin;
import org.jdbi.v3.sqlobject.SqlObjectPlugin;

/**
* A command to manually run database migrations when needed. This migration requires a heavy DB
* operation which can be done asynchronously (with limited API downtime) due to separate migration
* command.
*/
@Slf4j
public class DbMigrationCommand<MarquezConfig> extends EnvironmentCommand<marquez.MarquezConfig> {

private static final String DB_MIGRATE = "db-migrate";
private static final String MIGRATION_V57_DESCRIPTION =
"""
A command to manually run V57 database migration.
Please refer to https://github.com/MarquezProject/marquez/blob/main/api/src/main/resources/marquez/db/migration/V57__readme.md for more details.
""";

private static final String COMMAND_DESCRIPTION =
"""
A command to manually run database migrations.
Extra parameters are required to specify the migration to run.
""";

/**
* Creates a new environment command.
*
* @param application the application providing this command
*/
public DbMigrationCommand(Application<marquez.MarquezConfig> application) {
super(application, DB_MIGRATE, COMMAND_DESCRIPTION);
}

@Override
public void configure(Subparser subparser) {
subparser
.addArgument("--chunkSize")
.dest("chunkSize")
.type(Integer.class)
.required(false)
.setDefault(V57_1__BackfillFacets.DEFAULT_CHUNK_SIZE)
.help("amount of lineage_events rows processed in a single SQL query and transaction.");

subparser
.addArgument("--version")
.dest("version")
.type(String.class)
.required(true)
.help("migration version to apply like 'v57'");

addFileArgument(subparser);
}

@Override
protected void run(
Environment environment, Namespace namespace, marquez.MarquezConfig configuration)
throws Exception {

final DataSourceFactory sourceFactory = configuration.getDataSourceFactory();
final DataSource source = sourceFactory.build(environment.metrics(), "MarquezApp-source");
final JdbiFactory factory = new JdbiFactory();

Jdbi jdbi =
factory
.build(
environment,
configuration.getDataSourceFactory(),
(ManagedDataSource) source,
"postgresql-command")
.installPlugin(new SqlObjectPlugin())
.installPlugin(new PostgresPlugin())
.installPlugin(new Jackson2Plugin());

MarquezMigrations.valueOf(namespace.getString("version")).run(jdbi, namespace);
}

enum MarquezMigrations {
v57 {
public void run(Jdbi jdbi, Namespace namespace) throws Exception {
log.info("Running V57_1__BackfillFacets migration");
V57_1__BackfillFacets migration = new V57_1__BackfillFacets();
migration.setManual(true);
migration.setJdbi(jdbi);
migration.setChunkSize(namespace.getInt("chunkSize"));
migration.migrate(null);
}
};

public void run(Jdbi jdbi, Namespace namespace) throws Exception {
throw new UnsupportedOperationException();
}
}
}
53 changes: 23 additions & 30 deletions api/src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,22 @@ WITH selected_datasets AS (
FROM datasets_view d
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
), dataset_runs AS (
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, lineage_event_time, facet
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
INNER JOIN dataset_versions AS dv ON dv.uuid = d.current_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, event_time, event FROM lineage_events
WHERE run_uuid = dv.run_uuid
) e ON e.run_uuid = dv.run_uuid
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid
) df ON df.run_uuid = dv.run_uuid
UNION
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, event_time, event FROM lineage_events
WHERE run_uuid = rim.run_uuid
) e ON e.run_uuid = rim.run_uuid
SELECT dataset_uuid, run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid AND run_uuid = rim.run_uuid
) df ON df.run_uuid = rim.run_uuid
Comment on lines 78 to +94
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only purpose of this CTE is to determine which runs have input or output facets for the current dataset version. Given that the dataset_facets_view now has dataset_version_uuid, I think we can drop this whole subquery and join directly on df.dataset_version_uuid=d.current_version_uuid` below

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR #2407

)
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
FROM selected_datasets d
Expand All @@ -104,13 +104,9 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = d.uuid
LEFT JOIN (
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets
FROM dataset_runs d2,
jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds
WHERE d2.run_uuid = d2.run_uuid
AND ds -> 'facets' IS NOT NULL
AND ds ->> 'name' = d2.name
AND ds ->> 'namespace' = d2.namespace_name
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(d2.facet ORDER BY d2.lineage_event_time ASC) AS facets
FROM dataset_runs AS d2
WHERE d2.run_uuid = d2.run_uuid AND d2.facet IS NOT NULL
GROUP BY d2.uuid
) f ON f.dataset_uuid = d.uuid""")
Optional<Dataset> findDatasetByName(String namespaceName, String datasetName);
Expand Down Expand Up @@ -148,22 +144,22 @@ WITH selected_datasets AS (
ORDER BY d.name
LIMIT :limit OFFSET :offset
), dataset_runs AS (
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, lineage_event_time, facet
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, event_time, event FROM lineage_events
WHERE run_uuid = dv.run_uuid
) e ON e.run_uuid = dv.run_uuid
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid
) df ON df.run_uuid = dv.run_uuid
UNION
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, event_time, event FROM lineage_events
WHERE run_uuid = rim.run_uuid
) e ON e.run_uuid = rim.run_uuid
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid
) df ON df.run_uuid = rim.run_uuid
)
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
FROM selected_datasets d
Expand All @@ -176,13 +172,10 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = d.uuid
LEFT JOIN (
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time) AS facets
FROM dataset_runs d2,
jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(d2.facet ORDER BY d2.lineage_event_time ASC) AS facets
FROM dataset_runs AS d2
WHERE d2.run_uuid = d2.run_uuid
AND ds -> 'facets' IS NOT NULL
AND ds ->> 'name' = d2.name
AND ds ->> 'namespace' = d2.namespace_name
AND d2.facet IS NOT NULL
GROUP BY d2.uuid
) f ON f.dataset_uuid = d.uuid
ORDER BY d.name""")
Expand Down
15 changes: 8 additions & 7 deletions api/src/main/java/marquez/db/DatasetFacetsDao.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

Expand Down Expand Up @@ -70,9 +70,9 @@ public static Type typeFromName(@NonNull final String name) {
}

/**
* @param uuid
* @param createdAt
* @param datasetUuid
* @param datasetVersionUuid
* @param runUuid
* @param lineageEventTime
* @param lineageEventType
Expand All @@ -83,19 +83,19 @@ public static Type typeFromName(@NonNull final String name) {
@SqlUpdate(
"""
INSERT INTO dataset_facets (
uuid,
created_at,
dataset_uuid,
dataset_version_uuid,
run_uuid,
lineage_event_time,
lineage_event_type,
type,
name,
facet
) VALUES (
:uuid,
:createdAt,
:datasetUuid,
:datasetVersionUuid,
:runUuid,
:lineageEventTime,
:lineageEventType,
Expand All @@ -105,9 +105,9 @@ INSERT INTO dataset_facets (
)
""")
void insertDatasetFacet(
UUID uuid,
Instant createdAt,
UUID datasetUuid,
UUID datasetVersionUuid,
UUID runUuid,
Instant lineageEventTime,
String lineageEventType,
Expand All @@ -125,6 +125,7 @@ void insertDatasetFacet(
@Transaction
default void insertDatasetFacetsFor(
@NonNull UUID datasetUuid,
@NonNull UUID datasetVersionUuid,
@NonNull UUID runUuid,
@NonNull Instant lineageEventTime,
@NonNull String lineageEventType,
Expand All @@ -137,9 +138,9 @@ default void insertDatasetFacetsFor(
.forEach(
fieldName ->
insertDatasetFacet(
UUID.randomUUID(),
now,
datasetUuid,
datasetVersionUuid,
runUuid,
lineageEventTime,
lineageEventType,
Expand All @@ -149,9 +150,9 @@ default void insertDatasetFacetsFor(
}

record DatasetFacetRow(
UUID uuid,
Instant createdAt,
UUID datasetUuid,
UUID datasetVersionUuid,
UUID runUuid,
Instant lineageEventTime,
String lineageEventType,
Expand Down
Loading