Skip to content

Commit

Permalink
[#9588, #10039] dst: Ignore intents from aborted subtransactions of o…
Browse files Browse the repository at this point in the history
…ther transactions during conflict resolution

Summary:
Before this commit, any intents of other transactions discovered during conflict resolution would have the same lifetime as the other transaction, even if those intents were part of an aborted subtransaction. This commit modifies the behavior to ignore all aborted intents of other transactions during conflict resolution, in a best-effort manner.

The changes we make to support this new behavior are as follows:
1. Send aborted subtransaction metadata to the transaction coordinator during transaction heartbeat
2. Send subtransaction metadata to transaction participant in GetTransactionStatus RPCs
3. Do not conflict with intents from aborted subtransactions in conflict resolution
4. Track whether any intents discovered in a given subtransaction are modification intents. If this is not the case for any subtransactions which are not aborted for a given transaction, and that transaction is committed, ignore it.

One important caveat is this now introduces a divergence from postgres behavior which was masked by the behavior we had before this commit. In postgres serializable isolation, any rows which are read implicitly acquire a read lock. Even if that read happened within a subtransaction which is later rolled back, the implicit read lock is tied to the scope of the outer transaction. We are tracking this here: #9587

Test Plan: ybd --java-test 'org.yb.pgsql.TestPgSavepoints'

Reviewers: sergei, pjain

Reviewed By: sergei, pjain

Subscribers: mbautin, zyu, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D17177
  • Loading branch information
robertsami committed Jun 1, 2022
1 parent f7f1319 commit 4fb1676
Show file tree
Hide file tree
Showing 14 changed files with 233 additions and 99 deletions.
70 changes: 68 additions & 2 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgSavepoints.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ private OptionalInt getSingleValue(Connection c, int k) throws SQLException {

@Override
protected Map<String, String> getTServerFlags() {
// TODO(savepoints) -- enable by default.
Map<String, String> flags = super.getTServerFlags();
flags.put("txn_max_apply_batch_records", String.format("%d", LARGE_BATCH_ROW_THRESHOLD));
flags.put("TEST_inject_sleep_before_applying_intents_ms", "10000");
return flags;
}

Expand Down Expand Up @@ -134,7 +134,7 @@ public void testSavepointUpdateAbortedRow() throws Exception {
}

@Test
public void testAbortsIntentOfReleasedSavepoint() throws Exception {
public void testIgnoresIntentOfRolledBackSavepointSameTxn() throws Exception {
createTable();

for (IsolationLevel isoLevel: isoLevels) {
Expand All @@ -155,6 +155,72 @@ public void testAbortsIntentOfReleasedSavepoint() throws Exception {
}
}

@Test
public void testIgnoresIntentOfRolledBackSavepointCrossTxn() throws Exception {
createTable();

Connection conn1 = getConnectionBuilder()
.withIsolationLevel(IsolationLevel.READ_COMMITTED)
.withAutoCommit(AutoCommit.DISABLED)
.connect();
Connection conn2 = getConnectionBuilder()
.withIsolationLevel(IsolationLevel.READ_COMMITTED)
.withAutoCommit(AutoCommit.DISABLED)
.connect();

Statement s1 = conn1.createStatement();
Statement s2 = conn2.createStatement();

s1.execute("INSERT INTO t VALUES (1, 1)");
s1.execute("SAVEPOINT a");
s1.execute("INSERT INTO t VALUES (2, 1)");
s1.execute("ROLLBACK TO a");

s2.execute("INSERT INTO t VALUES (2, 2)");

conn1.commit();
conn2.commit();


assertEquals(OptionalInt.of(1), getSingleValue(conn1, 1));
assertEquals(OptionalInt.of(2), getSingleValue(conn1, 2));
}

@Test
public void testIgnoresLockOnlyConflictOfCommittedTxn() throws Exception {
createTable();

getConnectionBuilder().connect().createStatement()
.execute("INSERT INTO t SELECT generate_series(1, 10), 0");

Connection conn1 = getConnectionBuilder()
.withIsolationLevel(IsolationLevel.REPEATABLE_READ)
.withAutoCommit(AutoCommit.DISABLED)
.connect();
Connection conn2 = getConnectionBuilder()
.withIsolationLevel(IsolationLevel.REPEATABLE_READ)
.withAutoCommit(AutoCommit.DISABLED)
.connect();

Statement s1 = conn1.createStatement();
Statement s2 = conn2.createStatement();

s1.execute("SELECT * FROM t");
s2.execute("SELECT * FROM t");

s1.execute("SAVEPOINT a");
s1.execute("UPDATE t SET v=1 WHERE k=1");
s1.execute("ROLLBACK TO a");
s1.execute("SELECT * FROM t WHERE k=1 FOR UPDATE");
s1.execute("UPDATE t SET v=1 WHERE k=3");
conn1.commit();

s2.execute("UPDATE t SET v=2 WHERE k=1");
conn2.commit();

assertEquals(OptionalInt.of(2), getSingleValue(conn1, 1));
}

@Test
public void testStackedSavepoints() throws Exception {
createTable();
Expand Down
6 changes: 4 additions & 2 deletions src/yb/client/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -880,11 +880,14 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
req.set_tablet_id(status_tablet->tablet_id());
req.set_propagated_hybrid_time(manager_->Now().ToUint64());

// TODO(savepoints) -- Attach metadata about aborted subtransactions in heartbeat.
auto& state = *req.mutable_state();
state.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size());
state.set_status(status);

if (subtransaction_.active()) {
subtransaction_.get().aborted.ToPB(state.mutable_aborted()->mutable_set());
}

return UpdateTransaction(
deadline,
status_tablet.get(),
Expand Down Expand Up @@ -922,7 +925,6 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
req.set_propagated_hybrid_time(manager_->Now().ToUint64());
auto& state = *req.mutable_state();
state.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size());
// TODO(savepoints) -- Attach metadata about aborted subtransactions to commit message.
state.set_status(seal_only ? TransactionStatus::SEALED : TransactionStatus::COMMITTED);
state.mutable_tablets()->Reserve(narrow_cast<int>(tablets_.size()));
for (const auto& tablet : tablets_) {
Expand Down
2 changes: 1 addition & 1 deletion src/yb/common/transaction-test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class TransactionStatusManagerMock : public TransactionStatusManager {
return HybridTime::kInvalid;
}

boost::optional<CommitMetadata> LocalCommitData(const TransactionId& id) override {
boost::optional<TransactionLocalState> LocalTxnData(const TransactionId& id) override {
return boost::none;
}

Expand Down
4 changes: 2 additions & 2 deletions src/yb/common/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ struct StatusRequest {

class RequestScope;

struct CommitMetadata {
struct TransactionLocalState {
HybridTime commit_ht;
AbortedSubTransactionSet aborted_subtxn_set;
};
Expand All @@ -135,7 +135,7 @@ class TransactionStatusManager {

// If this tablet is aware that this transaction has committed, returns the CommitMetadata for the
// transaction. Otherwise, returns boost::none.
virtual boost::optional<CommitMetadata> LocalCommitData(const TransactionId& id) = 0;
virtual boost::optional<TransactionLocalState> LocalTxnData(const TransactionId& id) = 0;

// Fetches status of specified transaction at specified time from transaction coordinator.
// Callback would be invoked in any case.
Expand Down
Loading

0 comments on commit 4fb1676

Please sign in to comment.