Skip to content

Commit

Permalink
Avoid locking common records and remove transaction annotation (#2036)
Browse files Browse the repository at this point in the history
* Fix NamespaceDao to stop updating namespace record on conflict

Signed-off-by: Michael Collado <collado.mike@gmail.com>

* Update JobContextDao to stop locking job context on conflict

Signed-off-by: Michael Collado <collado.mike@gmail.com>

* Update RunArgsDao to stop locking run args on conflict

Signed-off-by: Michael Collado <collado.mike@gmail.com>

* Remove transaction annotations

Signed-off-by: Michael Collado <collado.mike@gmail.com>

* Update changelog

Signed-off-by: Michael Collado <collado.mike@gmail.com>
  • Loading branch information
collado-mike authored Jul 15, 2022
1 parent cdee331 commit 78eeae0
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
### Changed

* Set default limit for listing datasets and jobs in UI from `2000` to `25` [#2018](https://github.com/MarquezProject/marquez/pull/2018) [@wslulciuc](https://github.com/wslulciuc)
* Update OpenLineage write API to be non-transactional and avoid unnecessary locks on records under heavy contention [@collado-mike](https://github.com/collado-mike)

### Fixed

Expand Down
18 changes: 12 additions & 6 deletions api/src/main/java/marquez/db/JobContextDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,26 @@
import marquez.db.models.JobContextRow;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

@RegisterRowMapper(JobContextRowMapper.class)
public interface JobContextDao {
@SqlQuery("SELECT * FROM job_contexts WHERE uuid = :uuid")
Optional<JobContextRow> findContextByUuid(UUID uuid);

@SqlQuery(
@SqlQuery("SELECT * FROM job_contexts WHERE checksum=:checksum")
Optional<JobContextRow> findContextByChecksum(String checksum);

default JobContextRow upsert(UUID uuid, Instant now, String context, String checksum) {
doUpsert(uuid, now, context, checksum);
return findContextByChecksum(checksum).orElseThrow();
}

@SqlUpdate(
"INSERT INTO job_contexts "
+ "(uuid, created_at, context, checksum) "
+ "VALUES "
+ "(:uuid, :now, :context, :checksum) "
+ "ON CONFLICT (checksum) DO "
+ "UPDATE SET "
+ "context = EXCLUDED.context "
+ "RETURNING *")
JobContextRow upsert(UUID uuid, Instant now, String context, String checksum);
+ "ON CONFLICT (checksum) DO NOTHING")
void doUpsert(UUID uuid, Instant now, String context, String checksum);
}
7 changes: 4 additions & 3 deletions api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import org.jdbi.v3.sqlobject.transaction.Transaction;

/** The DAO for {@code JobVersion}. */
@RegisterRowMapper(ExtendedJobVersionRowMapper.class)
Expand Down Expand Up @@ -294,7 +293,6 @@ default List<UUID> findOutputDatasetsFor(UUID jobVersionUuid) {
* @param transitionedAt The timestamp of the run state transition.
* @return A {@link BagOfJobVersionInfo} object.
*/
@Transaction
default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
@NonNull String namespaceName,
@NonNull String jobName,
Expand Down Expand Up @@ -327,7 +325,10 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
final Version jobVersion =
Utils.newJobVersionFor(
NamespaceName.of(jobRow.getNamespaceName()),
JobName.of(jobRow.getName()),
JobName.of(
Optional.ofNullable(jobRow.getParentJobName())
.map(pn -> pn + "." + jobRow.getSimpleName())
.orElse(jobRow.getName())),
toDatasetIds(jobVersionInputs),
toDatasetIds(jobVersionOutputs),
jobContext,
Expand Down
28 changes: 23 additions & 5 deletions api/src/main/java/marquez/db/NamespaceDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,27 @@ default Namespace upsertNamespaceMeta(
@SqlQuery("SELECT * FROM namespaces ORDER BY name LIMIT :limit OFFSET :offset")
List<Namespace> findAll(int limit, int offset);

@SqlQuery(
default NamespaceRow upsertNamespaceRow(
UUID uuid, Instant now, String name, String currentOwnerName) {
doUpsertNamespaceRow(uuid, now, name, currentOwnerName);
return findNamespaceByName(name).orElseThrow();
}

/**
* This query is executed by the OpenLineage write path, meaning namespaces are written to a LOT.
* Updating the record to modify the updateAt timestamp means the same namespace is often under
* heavy contention unnecessarily (it's really not being updated), causing some requests to wait
* for a lock while other requests are finishing. If a single namespace is under heavy contention,
* this can cause some requests to wait a long time - i.e., minutes. This causes unacceptable
* latency and failures in the write path. Avoid any updates in this query to avoid unnecessary
* locks.
*
* @param uuid
* @param now
* @param name
* @param currentOwnerName
*/
@SqlUpdate(
"INSERT INTO namespaces ( "
+ "uuid, "
+ "created_at, "
Expand All @@ -91,10 +111,8 @@ default Namespace upsertNamespaceMeta(
+ ":now, "
+ ":name, "
+ ":currentOwnerName) "
+ "ON CONFLICT(name) DO "
+ "UPDATE SET updated_at = EXCLUDED.updated_at "
+ "RETURNING *")
NamespaceRow upsertNamespaceRow(UUID uuid, Instant now, String name, String currentOwnerName);
+ "ON CONFLICT(name) DO NOTHING")
void doUpsertNamespaceRow(UUID uuid, Instant now, String name, String currentOwnerName);

@SqlQuery(
"INSERT INTO namespaces ( "
Expand Down
2 changes: 0 additions & 2 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import marquez.service.models.LineageEvent.SchemaDatasetFacet;
import marquez.service.models.LineageEvent.SchemaField;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.postgresql.util.PGobject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -83,7 +82,6 @@ void createLineageEvent(
PGobject event,
String producer);

@Transaction
default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper mapper) {
UpdateLineageRow updateLineageRow = updateBaseMarquezModel(event, mapper);
RunState runState = getRunState(event.getEventType());
Expand Down
19 changes: 13 additions & 6 deletions api/src/main/java/marquez/db/RunArgsDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,25 @@
package marquez.db;

import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import marquez.db.mappers.RunArgsRowMapper;
import marquez.db.models.RunArgsRow;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

@RegisterRowMapper(RunArgsRowMapper.class)
public interface RunArgsDao {
@SqlQuery(
default RunArgsRow upsertRunArgs(UUID uuid, Instant now, String args, String checksum) {
doUpsertRunArgs(uuid, now, args, checksum);
return findRunArgsByChecksum(checksum).orElseThrow();
}

@SqlQuery("SELECT * FROM run_args WHERE checksum=:checksum")
Optional<RunArgsRow> findRunArgsByChecksum(String checksum);

@SqlUpdate(
"INSERT INTO run_args ( "
+ "uuid, "
+ "created_at, "
Expand All @@ -25,9 +35,6 @@ public interface RunArgsDao {
+ ":now, "
+ ":args, "
+ ":checksum "
+ ") ON CONFLICT(checksum) DO "
+ "UPDATE SET "
+ "args = :args "
+ "RETURNING *")
RunArgsRow upsertRunArgs(UUID uuid, Instant now, String args, String checksum);
+ ") ON CONFLICT(checksum) DO NOTHING")
void doUpsertRunArgs(UUID uuid, Instant now, String args, String checksum);
}

0 comments on commit 78eeae0

Please sign in to comment.