From b60579494e9d4b87c6f1cbfa1f8bbe93ab823d16 Mon Sep 17 00:00:00 2001 From: Michael Collado Date: Sun, 3 Jul 2022 21:18:44 -0700 Subject: [PATCH 1/5] Fix NamespaceDao to stop updating namespace record on conflict Signed-off-by: Michael Collado --- .../main/java/marquez/db/NamespaceDao.java | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/api/src/main/java/marquez/db/NamespaceDao.java b/api/src/main/java/marquez/db/NamespaceDao.java index 6f566f84fc..a7f36e2336 100644 --- a/api/src/main/java/marquez/db/NamespaceDao.java +++ b/api/src/main/java/marquez/db/NamespaceDao.java @@ -78,7 +78,27 @@ default Namespace upsertNamespaceMeta( @SqlQuery("SELECT * FROM namespaces ORDER BY name LIMIT :limit OFFSET :offset") List findAll(int limit, int offset); - @SqlQuery( + default NamespaceRow upsertNamespaceRow( + UUID uuid, Instant now, String name, String currentOwnerName) { + doUpsertNamespaceRow(uuid, now, name, currentOwnerName); + return findNamespaceByName(name).orElseThrow(); + } + + /** + * This query is executed by the OpenLineage write path, meaning namespaces are written to a LOT. + * Updating the record to modify the updateAt timestamp means the same namespace is often under + * heavy contention unnecessarily (it's really not being updated), causing some requests to wait + * for a lock while other requests are finishing. If a single namespace is under heavy contention, + * this can cause some requests to wait a long time - i.e., minutes. This causes unacceptable + * latency and failures in the write path. Avoid any updates in this query to avoid unnecessary + * locks. + * + * @param uuid + * @param now + * @param name + * @param currentOwnerName + */ + @SqlUpdate( "INSERT INTO namespaces ( " + "uuid, " + "created_at, " @@ -91,10 +111,8 @@ default Namespace upsertNamespaceMeta( + ":now, " + ":name, " + ":currentOwnerName) " - + "ON CONFLICT(name) DO " - + "UPDATE SET updated_at = EXCLUDED.updated_at " - + "RETURNING *") - NamespaceRow upsertNamespaceRow(UUID uuid, Instant now, String name, String currentOwnerName); + + "ON CONFLICT(name) DO NOTHING") + void doUpsertNamespaceRow(UUID uuid, Instant now, String name, String currentOwnerName); @SqlQuery( "INSERT INTO namespaces ( " From a3a7e694c9a022d39b65ad362edefa0f26842d91 Mon Sep 17 00:00:00 2001 From: Michael Collado Date: Sun, 3 Jul 2022 23:06:21 -0700 Subject: [PATCH 2/5] Update JobContextDao to stop locking job context on conflict Signed-off-by: Michael Collado --- .../main/java/marquez/db/JobContextDao.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/api/src/main/java/marquez/db/JobContextDao.java b/api/src/main/java/marquez/db/JobContextDao.java index 0f66c21a86..2d2cae779d 100644 --- a/api/src/main/java/marquez/db/JobContextDao.java +++ b/api/src/main/java/marquez/db/JobContextDao.java @@ -12,20 +12,26 @@ import marquez.db.models.JobContextRow; import org.jdbi.v3.sqlobject.config.RegisterRowMapper; import org.jdbi.v3.sqlobject.statement.SqlQuery; +import org.jdbi.v3.sqlobject.statement.SqlUpdate; @RegisterRowMapper(JobContextRowMapper.class) public interface JobContextDao { @SqlQuery("SELECT * FROM job_contexts WHERE uuid = :uuid") Optional findContextByUuid(UUID uuid); - @SqlQuery( + @SqlQuery("SELECT * FROM job_contexts WHERE checksum=:checksum") + Optional findContextByChecksum(String checksum); + + default JobContextRow upsert(UUID uuid, Instant now, String context, String checksum) { + doUpsert(uuid, now, context, checksum); + return findContextByChecksum(checksum).orElseThrow(); + } + + @SqlUpdate( "INSERT INTO job_contexts " + "(uuid, created_at, context, checksum) " + "VALUES " + "(:uuid, :now, :context, :checksum) " - + "ON CONFLICT (checksum) DO " - + "UPDATE SET " - + "context = EXCLUDED.context " - + "RETURNING *") - JobContextRow upsert(UUID uuid, Instant now, String context, String checksum); + + "ON CONFLICT (checksum) DO NOTHING") + void doUpsert(UUID uuid, Instant now, String context, String checksum); } From d22fa1083ab9bf3d556e43d084d2b3ef3a17f140 Mon Sep 17 00:00:00 2001 From: Michael Collado Date: Sun, 3 Jul 2022 23:45:07 -0700 Subject: [PATCH 3/5] Update RunArgsDao to stop locking run args on conflict Signed-off-by: Michael Collado --- api/src/main/java/marquez/db/RunArgsDao.java | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/api/src/main/java/marquez/db/RunArgsDao.java b/api/src/main/java/marquez/db/RunArgsDao.java index 12f52c7bdc..973abec616 100644 --- a/api/src/main/java/marquez/db/RunArgsDao.java +++ b/api/src/main/java/marquez/db/RunArgsDao.java @@ -6,15 +6,25 @@ package marquez.db; import java.time.Instant; +import java.util.Optional; import java.util.UUID; import marquez.db.mappers.RunArgsRowMapper; import marquez.db.models.RunArgsRow; import org.jdbi.v3.sqlobject.config.RegisterRowMapper; import org.jdbi.v3.sqlobject.statement.SqlQuery; +import org.jdbi.v3.sqlobject.statement.SqlUpdate; @RegisterRowMapper(RunArgsRowMapper.class) public interface RunArgsDao { - @SqlQuery( + default RunArgsRow upsertRunArgs(UUID uuid, Instant now, String args, String checksum) { + doUpsertRunArgs(uuid, now, args, checksum); + return findRunArgsByChecksum(checksum).orElseThrow(); + } + + @SqlQuery("SELECT * FROM run_args WHERE checksum=:checksum") + Optional findRunArgsByChecksum(String checksum); + + @SqlUpdate( "INSERT INTO run_args ( " + "uuid, " + "created_at, " @@ -25,9 +35,6 @@ public interface RunArgsDao { + ":now, " + ":args, " + ":checksum " - + ") ON CONFLICT(checksum) DO " - + "UPDATE SET " - + "args = :args " - + "RETURNING *") - RunArgsRow upsertRunArgs(UUID uuid, Instant now, String args, String checksum); + + ") ON CONFLICT(checksum) DO NOTHING") + void doUpsertRunArgs(UUID uuid, Instant now, String args, String checksum); } From b882e50a8c9761dcecc4646ea37c79faa76b9904 Mon Sep 17 00:00:00 2001 From: Michael Collado Date: Tue, 12 Jul 2022 14:15:18 -0700 Subject: [PATCH 4/5] Remove transaction annotations Signed-off-by: Michael Collado --- api/src/main/java/marquez/db/JobVersionDao.java | 7 ++++--- api/src/main/java/marquez/db/OpenLineageDao.java | 2 -- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/api/src/main/java/marquez/db/JobVersionDao.java b/api/src/main/java/marquez/db/JobVersionDao.java index 6ac8025ab4..28fcc5eb18 100644 --- a/api/src/main/java/marquez/db/JobVersionDao.java +++ b/api/src/main/java/marquez/db/JobVersionDao.java @@ -36,7 +36,6 @@ import org.jdbi.v3.sqlobject.config.RegisterRowMapper; import org.jdbi.v3.sqlobject.statement.SqlQuery; import org.jdbi.v3.sqlobject.statement.SqlUpdate; -import org.jdbi.v3.sqlobject.transaction.Transaction; /** The DAO for {@code JobVersion}. */ @RegisterRowMapper(ExtendedJobVersionRowMapper.class) @@ -294,7 +293,6 @@ default List findOutputDatasetsFor(UUID jobVersionUuid) { * @param transitionedAt The timestamp of the run state transition. * @return A {@link BagOfJobVersionInfo} object. */ - @Transaction default BagOfJobVersionInfo upsertJobVersionOnRunTransition( @NonNull String namespaceName, @NonNull String jobName, @@ -327,7 +325,10 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition( final Version jobVersion = Utils.newJobVersionFor( NamespaceName.of(jobRow.getNamespaceName()), - JobName.of(jobRow.getName()), + JobName.of( + Optional.ofNullable(jobRow.getParentJobName()) + .map(pn -> pn + "." + jobRow.getSimpleName()) + .orElse(jobRow.getName())), toDatasetIds(jobVersionInputs), toDatasetIds(jobVersionOutputs), jobContext, diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 9ee3272f6a..f2b69735bb 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -55,7 +55,6 @@ import marquez.service.models.LineageEvent.SchemaDatasetFacet; import marquez.service.models.LineageEvent.SchemaField; import org.jdbi.v3.sqlobject.statement.SqlUpdate; -import org.jdbi.v3.sqlobject.transaction.Transaction; import org.postgresql.util.PGobject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,7 +82,6 @@ void createLineageEvent( PGobject event, String producer); - @Transaction default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper mapper) { UpdateLineageRow updateLineageRow = updateBaseMarquezModel(event, mapper); RunState runState = getRunState(event.getEventType()); From 0d1f82d9f46f95733fbf2bfde96a151b5e1f3d32 Mon Sep 17 00:00:00 2001 From: Michael Collado Date: Thu, 14 Jul 2022 13:46:02 -0700 Subject: [PATCH 5/5] Update changelog Signed-off-by: Michael Collado --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bb785b4c4a..7c72ae7ea0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ ### Changed * Set default limit for listing datasets and jobs in UI from `2000` to `25` [#2018](https://github.com/MarquezProject/marquez/pull/2018) [@wslulciuc](https://github.com/wslulciuc) +* Update OpenLineage write API to be non-transactional and avoid unnecessary locks on records under heavy contention [@collado-mike](https://github.com/collado-mike) ### Fixed