Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid locking common records and remove transaction annotation #2036

Merged
merged 6 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
### Changed

* Set default limit for listing datasets and jobs in UI from `2000` to `25` [#2018](https://github.com/MarquezProject/marquez/pull/2018) [@wslulciuc](https://github.com/wslulciuc)
* Update OpenLineage write API to be non-transactional and avoid unnecessary locks on records under heavy contention [@collado-mike](https://github.com/collado-mike)

### Fixed

Expand Down
18 changes: 12 additions & 6 deletions api/src/main/java/marquez/db/JobContextDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,26 @@
import marquez.db.models.JobContextRow;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

@RegisterRowMapper(JobContextRowMapper.class)
public interface JobContextDao {
@SqlQuery("SELECT * FROM job_contexts WHERE uuid = :uuid")
Optional<JobContextRow> findContextByUuid(UUID uuid);

@SqlQuery(
@SqlQuery("SELECT * FROM job_contexts WHERE checksum=:checksum")
Optional<JobContextRow> findContextByChecksum(String checksum);

default JobContextRow upsert(UUID uuid, Instant now, String context, String checksum) {
doUpsert(uuid, now, context, checksum);
return findContextByChecksum(checksum).orElseThrow();
}

@SqlUpdate(
"INSERT INTO job_contexts "
+ "(uuid, created_at, context, checksum) "
+ "VALUES "
+ "(:uuid, :now, :context, :checksum) "
+ "ON CONFLICT (checksum) DO "
+ "UPDATE SET "
+ "context = EXCLUDED.context "
+ "RETURNING *")
JobContextRow upsert(UUID uuid, Instant now, String context, String checksum);
+ "ON CONFLICT (checksum) DO NOTHING")
void doUpsert(UUID uuid, Instant now, String context, String checksum);
}
7 changes: 4 additions & 3 deletions api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import org.jdbi.v3.sqlobject.transaction.Transaction;

/** The DAO for {@code JobVersion}. */
@RegisterRowMapper(ExtendedJobVersionRowMapper.class)
Expand Down Expand Up @@ -294,7 +293,6 @@ default List<UUID> findOutputDatasetsFor(UUID jobVersionUuid) {
* @param transitionedAt The timestamp of the run state transition.
* @return A {@link BagOfJobVersionInfo} object.
*/
@Transaction
default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
@NonNull String namespaceName,
@NonNull String jobName,
Expand Down Expand Up @@ -327,7 +325,10 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
final Version jobVersion =
Utils.newJobVersionFor(
NamespaceName.of(jobRow.getNamespaceName()),
JobName.of(jobRow.getName()),
JobName.of(
Optional.ofNullable(jobRow.getParentJobName())
.map(pn -> pn + "." + jobRow.getSimpleName())
.orElse(jobRow.getName())),
toDatasetIds(jobVersionInputs),
toDatasetIds(jobVersionOutputs),
jobContext,
Expand Down
28 changes: 23 additions & 5 deletions api/src/main/java/marquez/db/NamespaceDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,27 @@ default Namespace upsertNamespaceMeta(
@SqlQuery("SELECT * FROM namespaces ORDER BY name LIMIT :limit OFFSET :offset")
List<Namespace> findAll(int limit, int offset);

@SqlQuery(
default NamespaceRow upsertNamespaceRow(
UUID uuid, Instant now, String name, String currentOwnerName) {
doUpsertNamespaceRow(uuid, now, name, currentOwnerName);
return findNamespaceByName(name).orElseThrow();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: The RETURNING * was to avoid inserting, then querying for the namespace in two separate queries. But, also understand the cost of always updating updated_at to trigger the modified row to be returned. At some point, I'd love to flush out the usage of updated_at for all of our tables.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, the RETURNING * only returns if the record was modified - if we want to DO NOTHING to avoid locking the record, we have to execute a separate query 🤷🏽‍♂️

}

/**
* This query is executed by the OpenLineage write path, meaning namespaces are written to a LOT.
* Updating the record to modify the updateAt timestamp means the same namespace is often under
* heavy contention unnecessarily (it's really not being updated), causing some requests to wait
* for a lock while other requests are finishing. If a single namespace is under heavy contention,
* this can cause some requests to wait a long time - i.e., minutes. This causes unacceptable
* latency and failures in the write path. Avoid any updates in this query to avoid unnecessary
* locks.
*
* @param uuid
* @param now
* @param name
* @param currentOwnerName
*/
@SqlUpdate(
"INSERT INTO namespaces ( "
+ "uuid, "
+ "created_at, "
Expand All @@ -91,10 +111,8 @@ default Namespace upsertNamespaceMeta(
+ ":now, "
+ ":name, "
+ ":currentOwnerName) "
+ "ON CONFLICT(name) DO "
+ "UPDATE SET updated_at = EXCLUDED.updated_at "
+ "RETURNING *")
NamespaceRow upsertNamespaceRow(UUID uuid, Instant now, String name, String currentOwnerName);
+ "ON CONFLICT(name) DO NOTHING")
void doUpsertNamespaceRow(UUID uuid, Instant now, String name, String currentOwnerName);

@SqlQuery(
"INSERT INTO namespaces ( "
Expand Down
2 changes: 0 additions & 2 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import marquez.service.models.LineageEvent.SchemaDatasetFacet;
import marquez.service.models.LineageEvent.SchemaField;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.postgresql.util.PGobject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -83,7 +82,6 @@ void createLineageEvent(
PGobject event,
String producer);

@Transaction
default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper mapper) {
UpdateLineageRow updateLineageRow = updateBaseMarquezModel(event, mapper);
RunState runState = getRunState(event.getEventType());
Expand Down
19 changes: 13 additions & 6 deletions api/src/main/java/marquez/db/RunArgsDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,25 @@
package marquez.db;

import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import marquez.db.mappers.RunArgsRowMapper;
import marquez.db.models.RunArgsRow;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

@RegisterRowMapper(RunArgsRowMapper.class)
public interface RunArgsDao {
@SqlQuery(
default RunArgsRow upsertRunArgs(UUID uuid, Instant now, String args, String checksum) {
doUpsertRunArgs(uuid, now, args, checksum);
return findRunArgsByChecksum(checksum).orElseThrow();
}

@SqlQuery("SELECT * FROM run_args WHERE checksum=:checksum")
Optional<RunArgsRow> findRunArgsByChecksum(String checksum);

@SqlUpdate(
"INSERT INTO run_args ( "
+ "uuid, "
+ "created_at, "
Expand All @@ -25,9 +35,6 @@ public interface RunArgsDao {
+ ":now, "
+ ":args, "
+ ":checksum "
+ ") ON CONFLICT(checksum) DO "
+ "UPDATE SET "
+ "args = :args "
+ "RETURNING *")
RunArgsRow upsertRunArgs(UUID uuid, Instant now, String args, String checksum);
+ ") ON CONFLICT(checksum) DO NOTHING")
void doUpsertRunArgs(UUID uuid, Instant now, String args, String checksum);
}