diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgConflictWithAbortedSubTxns.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgConflictWithAbortedSubTxns.java new file mode 100644 index 000000000000..dea30f29772c --- /dev/null +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgConflictWithAbortedSubTxns.java @@ -0,0 +1,154 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. + +package org.yb.pgsql; + +import static org.yb.AssertionWrappers.*; + +import java.sql.Connection; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.yb.util.ThreadUtil; +import org.yb.util.YBTestRunnerNonTsanOnly; + +/** + * This test mimics the scenario in #12767 - + * + * Before this commit's fix, the following scenario leads to a situation where 2 single statement + * read committed transactions block each other - + * + * CREATE TABLE test (id int primary key, v timestamp with time zone NOT NULL); + * CREATE INDEX idx ON test USING btree (v); + * + * 1. 2 read committed isolation transactions are started in separate sessions with begin; + * 2. Both sessions get the request from client to execute a INSERT ON CONFLICT DO UPDATE query on a + * row in the main table. + * + * INSERT INTO test AS x (id, v) VALUES (1, statement_timestamp()) + * ON CONFLICT ON CONSTRAINT test_pkey + * DO UPDATE SET v = statement_timestamp() WHERE x.id = excluded.id + * + * 3. Backends of both sessions first read the main table to check if the key already exists. Since + * it does, both backends then try to perform the ON CONFLICT DO UPDATE by issuing three rpcs + * simultaneously to tserver processes - + * + * i) a PGSQL_UPDATE to the main table to update the v time + * ii) a PGSQL_DELETE to the secondary index to remove the entry with existing v value + * iii) a PGSQL_UPSERT to the secondary index to insert a new index entry with new v value + * + * 4. Rpc [a] of session 1 reaches the main table first and performs the write. + * Rpc [b] (and/or [c]) of session 2 reaches the index table first and performs the write there. + * + * 5. So, session 1 has successfully written a provisional entry to the main table and session 2 has + * successfully written provisional entries to the index table. + * + * 6. Now, the other rpcs in both session will fail due to a conflict i.e., rpc [b] and [c] of + * session 1 and rpc [a] of session 2 will fail. + * + * 7. Both sessions, after facing the conflict, will retry the statement by rolling back to the + * internal savepoint registered before the statement's execution (as is done by our read + * committed implementation). Rolling back will effectively mark the provisional data written by + * earlier rpcs in the statement as invalid so that the backends can retry their statement and no + * other transactions should be able to see those invalid provisional entries. + * + * 8. However, even after rolling back savepoints, other transactions consider the rolled back + * provisional entries for conflict resolution and hence still face conflicts. This is because + * the list of aborted sub txns isn't available to the txn participants during conflict + * detection. + * + * 9. This leads to both sessions retrying and facing false conflicts till statement timeout is hit. + */ +@RunWith(value = YBTestRunnerNonTsanOnly.class) +public class TestPgConflictWithAbortedSubTxns extends BasePgSQLTest { + private static final Logger LOG = + LoggerFactory.getLogger(TestPgConflictWithAbortedSubTxns.class); + + private static final int NUM_THREADS = 9; + private static final int TEST_DURATION_SECS = 120; + private static final int STATEMENT_TIMEOUT_MS = 10 * 1000; + + @Override + protected Map getTServerFlags() { + Map flags = super.getTServerFlags(); + flags.put("yb_enable_read_committed_isolation", "true"); + flags.put("ysql_pg_conf_csv", "statement_timeout=" + STATEMENT_TIMEOUT_MS); + return flags; + } + + @Test + public void ensureNoConflictsWithAbortedSubTxns() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute( + "CREATE TABLE test (id int primary key, v timestamp with time zone NOT NULL)"); + statement.execute("CREATE INDEX idx ON test USING btree (v)"); + } + + ExecutorService es = Executors.newFixedThreadPool(NUM_THREADS); + List> futures = new ArrayList<>(); + List runnables = new ArrayList<>(); + + for (int i = 0; i < NUM_THREADS; i++) { + runnables.add(() -> { + try (Connection conn = + getConnectionBuilder().withIsolationLevel(IsolationLevel.READ_COMMITTED) + .withAutoCommit(AutoCommit.ENABLED).connect(); + Statement stmt = conn.createStatement();) { + + long end_time = System.currentTimeMillis() + TEST_DURATION_SECS*1000; + + while (System.currentTimeMillis() < end_time) { + stmt.execute("BEGIN"); + stmt.execute( + "INSERT INTO test AS x (id, v) VALUES (1, statement_timestamp()) " + + "ON CONFLICT ON CONSTRAINT test_pkey " + + "DO UPDATE SET v = statement_timestamp() WHERE x.id = excluded.id" + ); + stmt.execute("COMMIT"); + } + } catch (Exception ex) { + fail("Failed due to exception: " + ex.getMessage()); + } + }); + } + + for (Runnable r : runnables) { + futures.add(es.submit(r)); + } + + try { + LOG.info("Waiting for all threads"); + for (Future future : futures) { + future.get(TEST_DURATION_SECS + 10, TimeUnit.SECONDS); + } + } catch (TimeoutException ex) { + LOG.warn("Threads info:\n\n" + ThreadUtil.getAllThreadsInfo()); + fail("Waiting for threads timed out, this is unexpected!"); + } + + try (Statement statement = connection.createStatement()) { + statement.execute("DROP TABLE test"); + } + } +} diff --git a/src/postgres/src/backend/access/transam/xact.c b/src/postgres/src/backend/access/transam/xact.c index 0ee557a39717..ec04f624eb27 100644 --- a/src/postgres/src/backend/access/transam/xact.c +++ b/src/postgres/src/backend/access/transam/xact.c @@ -4156,6 +4156,8 @@ DefineSavepoint(const char *name) /* Normal subtransaction start */ PushTransaction(); s = CurrentTransactionState; /* changed by push */ + elog(DEBUG2, "new sub txn created by savepoint, subtxn_id: %d", + s->subTransactionId); /* * Savepoint names, like the TransactionState block itself, live @@ -4442,7 +4444,7 @@ RollbackToSavepoint(const char *name) elog(FATAL, "RollbackToSavepoint: unexpected state %s", BlockStateAsString(xact->blockState)); - YBCRollbackSubTransaction(target->subTransactionId); + YBCRollbackToSubTransaction(target->subTransactionId); } /* @@ -4493,6 +4495,8 @@ BeginInternalSubTransaction(const char *name) /* Normal subtransaction start */ PushTransaction(); s = CurrentTransactionState; /* changed by push */ + elog(DEBUG2, "new sub txn created internally, subtxn_id: %d", + s->subTransactionId); /* * Savepoint names, like the TransactionState block itself, live @@ -4539,7 +4543,6 @@ BeginInternalSubTransaction(const char *name) */ void BeginInternalSubTransactionForReadCommittedStatement() { - elog(DEBUG2, "Begin internal sub txn for statement in READ COMMITTED isolation"); YBFlushBufferedOperations(); TransactionState s = CurrentTransactionState; @@ -4556,6 +4559,7 @@ BeginInternalSubTransactionForReadCommittedStatement() { /* Normal subtransaction start */ PushTransaction(); s = CurrentTransactionState; /* changed by push */ + elog(DEBUG2, "new internal sub txn in READ COMMITTED subtxn_id: %d", s->subTransactionId); s->name = MemoryContextStrdup(TopTransactionContext, YB_READ_COMMITTED_INTERNAL_SUB_TXN_NAME); @@ -5158,7 +5162,7 @@ AbortSubTransaction(void) AtEOSubXact_ApplyLauncher(false, s->nestingLevel); } - YBCRollbackSubTransaction(s->subTransactionId); + YBCRollbackToSubTransaction(s->subTransactionId); /* * Restore the upper transaction's read-only state, too. This should be diff --git a/src/postgres/src/backend/utils/misc/pg_yb_utils.c b/src/postgres/src/backend/utils/misc/pg_yb_utils.c index b2e231080b6b..60b69763b716 100644 --- a/src/postgres/src/backend/utils/misc/pg_yb_utils.c +++ b/src/postgres/src/backend/utils/misc/pg_yb_utils.c @@ -578,10 +578,10 @@ YBCSetActiveSubTransaction(SubTransactionId id) } void -YBCRollbackSubTransaction(SubTransactionId id) +YBCRollbackToSubTransaction(SubTransactionId id) { if (YBSavepointsEnabled()) - HandleYBStatus(YBCPgRollbackSubTransaction(id)); + HandleYBStatus(YBCPgRollbackToSubTransaction(id)); } bool diff --git a/src/postgres/src/include/pg_yb_utils.h b/src/postgres/src/include/pg_yb_utils.h index 0692f0464be3..f1e8eff74f1f 100644 --- a/src/postgres/src/include/pg_yb_utils.h +++ b/src/postgres/src/include/pg_yb_utils.h @@ -278,7 +278,7 @@ extern void YBCAbortTransaction(); extern void YBCSetActiveSubTransaction(SubTransactionId id); -extern void YBCRollbackSubTransaction(SubTransactionId id); +extern void YBCRollbackToSubTransaction(SubTransactionId id); /* * Return true if we want to allow PostgreSQL's own locking. This is needed diff --git a/src/postgres/src/test/isolation/yb_pg_isolation_schedule b/src/postgres/src/test/isolation/yb_pg_isolation_schedule index d38deb8a4116..fa28375189a5 100644 --- a/src/postgres/src/test/isolation/yb_pg_isolation_schedule +++ b/src/postgres/src/test/isolation/yb_pg_isolation_schedule @@ -28,4 +28,6 @@ test: insert-conflict-do-update test: multixact-no-deadlock test: read-only-anomaly test: nowait-2 -test: nowait-3 \ No newline at end of file +test: nowait-3 +test: aborted-keyrevoke +test: delete-abort-savept-2 \ No newline at end of file diff --git a/src/yb/client/transaction.cc b/src/yb/client/transaction.cc index 735949dd42f8..000d1c38b47d 100644 --- a/src/yb/client/transaction.cc +++ b/src/yb/client/transaction.cc @@ -120,12 +120,22 @@ Result ChildTransactionData::FromPB(const ChildTransaction YB_DEFINE_ENUM(MetadataState, (kMissing)(kMaybePresent)(kPresent)); +std::string YBSubTransaction::ToString() const { + return Format( + "{ sub_txn_: $0 highest_subtransaction_id_: $1 }", sub_txn_, highest_subtransaction_id_); +} + +bool YBSubTransaction::operator==(const YBSubTransaction& other) const { + return highest_subtransaction_id_ == other.highest_subtransaction_id_ && + sub_txn_ == other.sub_txn_; +} + void YBSubTransaction::SetActiveSubTransaction(SubTransactionId id) { sub_txn_.subtransaction_id = id; highest_subtransaction_id_ = std::max(highest_subtransaction_id_, id); } -Status YBSubTransaction::RollbackSubTransaction(SubTransactionId id) { +Status YBSubTransaction::RollbackToSubTransaction(SubTransactionId id) { // We should abort the range [id, sub_txn_.highest_subtransaction_id]. It's possible that we // have created and released savepoints, such that there have been writes with a // subtransaction_id greater than sub_txn_.subtransaction_id, and those should be aborted as @@ -256,7 +266,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { auto transaction = transaction_->shared_from_this(); TRACE_TO(trace_, __func__); { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); auto state = state_.load(std::memory_order_acquire); if (state != TransactionState::kRunning) { return STATUS_FORMAT( @@ -334,18 +344,20 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { SetReadTimeIfNeeded(ops_info->groups.size() > 1 || force_consistent_read); } - ops_info->metadata = { - .transaction = metadata_, - .subtransaction = subtransaction_.active() - ? boost::make_optional(subtransaction_.get()) - : boost::none, - }; + { + ops_info->metadata = { + .transaction = metadata_, + .subtransaction = subtransaction_.active() + ? boost::make_optional(subtransaction_.get()) + : boost::none, + }; + } return true; } void ExpectOperations(size_t count) EXCLUDES(mutex_) override { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); running_requests_ += count; } @@ -365,7 +377,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { CommitCallback commit_callback; { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); running_requests_ -= ops.size(); if (status.ok()) { @@ -636,6 +648,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { auto transaction = transaction_->shared_from_this(); waiters_.push_back([this, transaction](const Status& status) { WARN_NOT_OK(status, "Transaction request failed"); + UNIQUE_LOCK(lock, mutex_); if (status.ok()) { metadata_promise_.set_value(metadata_); } else { @@ -749,7 +762,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { } std::string ToString() EXCLUDES(mutex_) { - std::lock_guard lock(mutex_); + SharedLock lock(mutex_); return Format("{ metadata: $0 state: $1 }", metadata_, state_.load(std::memory_order_acquire)); } @@ -793,17 +806,130 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { } void SetActiveSubTransaction(SubTransactionId id) { + VLOG_WITH_PREFIX(4) << "set active sub txn=" << id + << ", subtransaction_=" << subtransaction_.ToString(); return subtransaction_.SetActiveSubTransaction(id); } - Status RollbackSubTransaction(SubTransactionId id) { + std::future SendHeartBeatOnRollback( + const CoarseTimePoint& deadline, const internal::RemoteTabletPtr& status_tablet, + rpc::Rpcs::Handle* handle, + const AbortedSubTransactionSet& aborted_sub_txn_set) { + DCHECK(status_tablet); + + return MakeFuture([&, handle](auto callback) { + manager_->rpcs().RegisterAndStart( + PrepareHeartbeatRPC( + deadline, status_tablet, TransactionStatus::PENDING, + [&, callback, handle](const auto& status, const auto& req, const auto& resp) { + UpdateClock(resp, manager_); + manager_->rpcs().Unregister(handle); + callback(status); + }, + aborted_sub_txn_set), handle); + }); + } + + Status RollbackToSubTransaction(SubTransactionId id, CoarseTimePoint deadline) EXCLUDES(mutex_) { SCHECK( subtransaction_.active(), InternalError, "Attempted to rollback to savepoint before creating any savepoints."); - return subtransaction_.RollbackSubTransaction(id); + + // A heartbeat should be sent (& waited for) to the txn status tablet(s) as part of a rollback. + // This is for updating the list of aborted sub-txns and ensures that other txns don't see false + // conflicts with this txn. + // + // Moreover the following correctness guarantee should always be maintained even if txn 1 and 2 + // are driven from YSQL on different nodes in the cluster: + // + // Txn 1 Txn 2 + // ----- ----- + // + // savepoint a; + // << write some intents + // (i.e., provisional writes) >> + // rollback to a; + // << any statement executed "after" (as per "real time"/ + // "wall clock") should be able to see that the intents of + // the rolled back sub-txn have been aborted and are + // invalid. >> + + #ifndef NDEBUG + YBSubTransaction subtransaction_copy_for_dcheck = subtransaction_; + #endif + + vector> heartbeat_futures; + { + SharedLock lock(mutex_); + if (!ready_) { + // ready_ can be false in 2 situations: + // + // (a) The transaction hasn't been pre-created and hence not registered yet at the status + // tablet. + // (b) The transaction is undergoing a promotion i.e., a txn is being registered at a new + // "global" status tablet. Once a heartbeat is sent to the new status tablet, promotion + // is complete. + // + // We don't have to worry sending the aborted sub-txn id list synchronously to the status + // tablet as part of a rollback in both situations with reasoning as follows for each + // situation - + // + // (1) For situation (a): if the txn hasn't been pre-created it means no ops have been + // performed as part of the txn (because any op would result in Prepare() and would + // wait for the txn to be ready_ before issuing the rpc). + // + // (2) For situation (b), we can't reach it ever during a rollback because - + // i) a promotion can only be triggered by Prepare() of some ops + // ii) the ops will wait for the promotion to complete i.e., a heartbeat is sent to + // the new status tablet of promoted txn + // iii) PgSession::RollbackToSubTransaction() will wait for all in flight ops via + // FlushBufferedOperations(). + return Status::OK(); + } + + // We are making a copy of subtransaction_ so that we can update it with the new range and + // send its aborted sub txn list in heartbeats before updating the actual subtransaction_. + YBSubTransaction subtransaction_copy = subtransaction_; + RETURN_NOT_OK(subtransaction_copy.RollbackToSubTransaction(id)); + + if (old_status_tablet_state_.load(std::memory_order_acquire) == + OldTransactionState::kRunning) { + // Don't have to send a heartbeat if the txn in old status tablet is non-existent (i.e., if + // no promotion activity occurred), or if it is being / has been retired by aborting. + VLOG_WITH_PREFIX(2) << "Sending heartbeat to old status tablet for sub-txn rollback."; + heartbeat_futures.push_back(SendHeartBeatOnRollback( + deadline, old_status_tablet_, &old_rollback_heartbeat_handle_, + subtransaction_copy.get().aborted)); + } + + auto state = state_.load(std::memory_order_acquire); + DCHECK(state != TransactionState::kPromoting); // can't happen, see comment above for details + if (state == TransactionState::kRunning) { + VLOG_WITH_PREFIX(2) << "Sending heartbeat to status tablet for sub-txn rollback."; + heartbeat_futures.push_back(SendHeartBeatOnRollback( + deadline, status_tablet_, &rollback_heartbeat_handle_, + subtransaction_copy.get().aborted)); + } + } + + // Wait for the heartbeat response + for (auto& future : heartbeat_futures) { + RETURN_NOT_OK(future.get()); + } + + #ifndef NDEBUG + DCHECK(subtransaction_copy_for_dcheck == subtransaction_); + #endif + + RETURN_NOT_OK(subtransaction_.RollbackToSubTransaction(id)); + VLOG_WITH_PREFIX(2) << "Aborted sub-txns from " << id + << "; subtransaction_=" << subtransaction_.ToString(); + + return Status::OK(); } - bool HasSubTransactionState() { + bool HasSubTransactionState() EXCLUDES(mutex_) { + SharedLock lock(mutex_); return subtransaction_.active(); } @@ -815,6 +941,8 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { commit_handle_ = manager_->rpcs().InvalidHandle(); abort_handle_ = manager_->rpcs().InvalidHandle(); old_abort_handle_ = manager_->rpcs().InvalidHandle(); + rollback_heartbeat_handle_ = manager_->rpcs().InvalidHandle(); + old_rollback_heartbeat_handle_ = manager_->rpcs().InvalidHandle(); auto metric_entity = manager_->client()->metric_entity(); if (metric_entity) { @@ -875,7 +1003,9 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { rpc::RpcCommandPtr PrepareHeartbeatRPC( CoarseTimePoint deadline, const internal::RemoteTabletPtr& status_tablet, - TransactionStatus status, UpdateTransactionCallback callback) REQUIRES(mutex_) { + TransactionStatus status, UpdateTransactionCallback callback, + std::optional aborted_set_for_rollback_heartbeat = + std::nullopt) { tserver::UpdateTransactionRequestPB req; req.set_tablet_id(status_tablet->tablet_id()); req.set_propagated_hybrid_time(manager_->Now().ToUint64()); @@ -884,8 +1014,11 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { state.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size()); state.set_status(status); - if (subtransaction_.active()) { - subtransaction_.get().aborted.ToPB(state.mutable_aborted()->mutable_set()); + if (aborted_set_for_rollback_heartbeat) { + VLOG_WITH_PREFIX(4) << "Setting aborted_set_for_rollback_heartbeat: " + << aborted_set_for_rollback_heartbeat.value().ToString(); + + aborted_set_for_rollback_heartbeat.value().ToPB(state.mutable_aborted()->mutable_set()); } return UpdateTransaction( @@ -974,7 +1107,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { decltype(status_tablet_) status_tablet; decltype(old_status_tablet_) old_status_tablet; { - std::lock_guard lock(mutex_); + SharedLock lock(mutex_); status_tablet = status_tablet_; old_status_tablet = old_status_tablet_; } @@ -1066,7 +1199,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { std::vector tablet_ids; { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); tablet_ids.reserve(tablets_.size()); for (const auto& tablet : tablets_) { // We don't check has_metadata here, because intents could be written even in case of @@ -1088,7 +1221,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { auto old_status_tablet_state = old_status_tablet_state_.load(std::memory_order_acquire); if (old_status_tablet_state == OldTransactionState::kAborting) { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); VLOG_WITH_PREFIX(1) << "Commit done, but waiting for abort on old status tablet"; cleanup_waiter_ = Waiter(std::bind(&Impl::CommitDone, this, status, response, transaction)); return; @@ -1103,14 +1236,14 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { CommitCallback commit_callback; if (state_.load(std::memory_order_acquire) != TransactionState::kCommitted && actual_status.ok()) { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); commit_replicated_ = true; if (running_requests_ != 0) { return; } commit_callback = std::move(commit_callback_); } else { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); commit_callback = std::move(commit_callback_); } VLOG_WITH_PREFIX(4) << "Commit done: " << actual_status; @@ -1131,7 +1264,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { auto old_status_tablet_state = old_status_tablet_state_.load(std::memory_order_acquire); if (old_status_tablet_state == OldTransactionState::kAborting) { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); VLOG_WITH_PREFIX(1) << "Abort done, but waiting for abort on old status tablet"; cleanup_waiter_ = Waiter(std::bind(&Impl::AbortDone, this, status, response, transaction)); return; @@ -1158,7 +1291,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { Waiter waiter; { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); cleanup_waiter_.swap(waiter); old_status_tablet_state_.store(OldTransactionState::kAborted, std::memory_order_release); } @@ -1250,8 +1383,8 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { } TransactionStatus HandleLookupTabletCases(const Result& result, - std::vector* waiters) { - std::lock_guard lock(mutex_); + std::vector* waiters) EXCLUDES(mutex_) { + std::lock_guard lock(mutex_); TransactionStatus status; bool notify_waiters; if (state_.load(std::memory_order_acquire) == TransactionState::kPromoting) { @@ -1308,7 +1441,9 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { // Notify all waiters. The transaction will be aborted if it is running and status is not OK. // `lock` must be UNIQUE_LOCK(.., mutex_), and will be released in this function. // If `set_ready` is true and status is OK, `ready_` must be false, and will be set to true. - void NotifyWaitersAndRelease(UniqueLock* lock, Status status, const char* operation, + void NotifyWaitersAndRelease(UniqueLock* lock, + Status status, + const char* operation, SetReady set_ready = SetReady::kFalse) NO_THREAD_SAFETY_ANALYSIS { std::vector waiters; bool trigger_abort = false; @@ -1387,7 +1522,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { rpc::RpcCommandPtr rpc; { - std::lock_guard lock(mutex_); + SharedLock lock(mutex_); internal::RemoteTabletPtr status_tablet; if (!send_to_new_tablet && old_status_tablet_) { @@ -1700,7 +1835,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { } void SetError(const Status& status, const char* operation) EXCLUDES(mutex_) { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); SetErrorUnlocked(status, operation); } @@ -1731,7 +1866,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { ChildTransactionDataPB child_txn_data_pb; { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); child_txn_data_pb = PrepareChildTransactionDataUnlocked(transaction); } @@ -1791,6 +1926,8 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { rpc::Rpcs::Handle commit_handle_; rpc::Rpcs::Handle abort_handle_; rpc::Rpcs::Handle old_abort_handle_; + rpc::Rpcs::Handle rollback_heartbeat_handle_; + rpc::Rpcs::Handle old_rollback_heartbeat_handle_; // RPC handles for informing participant tablets about a move in transaction status location. std::unordered_map @@ -1807,19 +1944,19 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { typedef std::unordered_map TabletStates; - std::mutex mutex_; + std::shared_mutex mutex_; TabletStates tablets_ GUARDED_BY(mutex_); - std::vector waiters_; + std::vector waiters_ GUARDED_BY(mutex_); // Commit waiter waiting for transaction status move related RPCs to finish. - Waiter commit_waiter_; + Waiter commit_waiter_ GUARDED_BY(mutex_); // Waiter for abort on old transaction status tablet to finish before cleaning up. - Waiter cleanup_waiter_; + Waiter cleanup_waiter_ GUARDED_BY(mutex_); std::atomic old_status_tablet_state_{OldTransactionState::kNone}; - std::promise> metadata_promise_; + std::promise> metadata_promise_ GUARDED_BY(mutex_); std::shared_future> metadata_future_ GUARDED_BY(mutex_); // As of 2021-04-05 running_requests_ reflects number of ops in progress within this transaction // only if no in-transaction operations have failed. @@ -1833,7 +1970,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { // https://github.com/yugabyte/yugabyte-db/issues/7984. size_t running_requests_ GUARDED_BY(mutex_) = 0; // Set to true after commit record is replicated. Used only during transaction sealing. - bool commit_replicated_ = false; + bool commit_replicated_ GUARDED_BY(mutex_) = false; scoped_refptr transaction_promotions_; }; @@ -1991,8 +2128,8 @@ void YBTransaction::SetActiveSubTransaction(SubTransactionId id) { return impl_->SetActiveSubTransaction(id); } -Status YBTransaction::RollbackSubTransaction(SubTransactionId id) { - return impl_->RollbackSubTransaction(id); +Status YBTransaction::RollbackToSubTransaction(SubTransactionId id, CoarseTimePoint deadline) { + return impl_->RollbackToSubTransaction(id, deadline); } bool YBTransaction::HasSubTransactionState() { diff --git a/src/yb/client/transaction.h b/src/yb/client/transaction.h index 64a1493ae7da..c9e029ba7f12 100644 --- a/src/yb/client/transaction.h +++ b/src/yb/client/transaction.h @@ -155,7 +155,7 @@ class YBTransaction : public std::enable_shared_from_this { void SetActiveSubTransaction(SubTransactionId id); - Status RollbackSubTransaction(SubTransactionId id); + Status RollbackToSubTransaction(SubTransactionId id, CoarseTimePoint deadline); bool HasSubTransactionState(); @@ -172,10 +172,14 @@ class YBSubTransaction { void SetActiveSubTransaction(SubTransactionId id); - Status RollbackSubTransaction(SubTransactionId id); + Status RollbackToSubTransaction(SubTransactionId id); const SubTransactionMetadata& get(); + std::string ToString() const; + + bool operator==(const YBSubTransaction& other) const; + private: SubTransactionMetadata sub_txn_; diff --git a/src/yb/common/transaction.h b/src/yb/common/transaction.h index ffa947f51fcc..030148b354f8 100644 --- a/src/yb/common/transaction.h +++ b/src/yb/common/transaction.h @@ -238,6 +238,11 @@ struct SubTransactionMetadata { return YB_STRUCT_TO_STRING(subtransaction_id, aborted); } + bool operator==(const SubTransactionMetadata& other) const { + return subtransaction_id == other.subtransaction_id && + aborted == other.aborted; + } + // Returns true if this is the default state, i.e. default subtransaction_id. This indicates // whether the client has interacted with savepoints at all in the context of a session. If true, // the client could, for example, skip sending subtransaction-related metadata in RPCs. diff --git a/src/yb/tablet/transaction_coordinator.cc b/src/yb/tablet/transaction_coordinator.cc index f8c77e7b81d3..8f198de82763 100644 --- a/src/yb/tablet/transaction_coordinator.cc +++ b/src/yb/tablet/transaction_coordinator.cc @@ -781,10 +781,10 @@ class TransactionState { } last_touch_ = data.hybrid_time; first_entry_raft_index_ = data.op_id.index; - // TODO -- consider swapping instead of copying here. - if (data.state.aborted().set_size() > aborted_.set_size()) { - aborted_ = data.state.aborted(); - } + + // TODO(savepoints) -- consider swapping instead of copying here. + aborted_ = data.state.aborted(); + return Status::OK(); } diff --git a/src/yb/tserver/pg_client.proto b/src/yb/tserver/pg_client.proto index 5d664976618c..ac35a5552980 100644 --- a/src/yb/tserver/pg_client.proto +++ b/src/yb/tserver/pg_client.proto @@ -51,8 +51,8 @@ service PgClientService { option (yb.rpc.lightweight_method).sides = PROXY; }; rpc ReserveOids(PgReserveOidsRequestPB) returns (PgReserveOidsResponsePB); - rpc RollbackSubTransaction(PgRollbackSubTransactionRequestPB) - returns (PgRollbackSubTransactionResponsePB); + rpc RollbackToSubTransaction(PgRollbackToSubTransactionRequestPB) + returns (PgRollbackToSubTransactionResponsePB); rpc SetActiveSubTransaction(PgSetActiveSubTransactionRequestPB) returns (PgSetActiveSubTransactionResponsePB); rpc TabletServerCount(PgTabletServerCountRequestPB) returns (PgTabletServerCountResponsePB); @@ -375,12 +375,12 @@ message PgReserveOidsResponsePB { uint32 end_oid = 3; } -message PgRollbackSubTransactionRequestPB { +message PgRollbackToSubTransactionRequestPB { uint64 session_id = 1; uint32 sub_transaction_id = 2; } -message PgRollbackSubTransactionResponsePB { +message PgRollbackToSubTransactionResponsePB { AppStatusPB status = 1; } diff --git a/src/yb/tserver/pg_client_service.h b/src/yb/tserver/pg_client_service.h index db8eb708ccf7..7386064de3ae 100644 --- a/src/yb/tserver/pg_client_service.h +++ b/src/yb/tserver/pg_client_service.h @@ -49,7 +49,7 @@ namespace tserver { (OpenTable) \ (ReadSequenceTuple) \ (ReserveOids) \ - (RollbackSubTransaction) \ + (RollbackToSubTransaction) \ (SetActiveSubTransaction) \ (TabletServerCount) \ (TruncateTable) \ diff --git a/src/yb/tserver/pg_client_session.cc b/src/yb/tserver/pg_client_session.cc index 71915e78db18..80433fe7d7f1 100644 --- a/src/yb/tserver/pg_client_session.cc +++ b/src/yb/tserver/pg_client_session.cc @@ -525,14 +525,15 @@ Status PgClientSession::DropTablegroup( return status; } -Status PgClientSession::RollbackSubTransaction( - const PgRollbackSubTransactionRequestPB& req, PgRollbackSubTransactionResponsePB* resp, +Status PgClientSession::RollbackToSubTransaction( + const PgRollbackToSubTransactionRequestPB& req, PgRollbackToSubTransactionResponsePB* resp, rpc::RpcContext* context) { VLOG_WITH_PREFIX_AND_FUNC(2) << req.ShortDebugString(); SCHECK(Transaction(PgClientSessionKind::kPlain), IllegalState, Format("Rollback sub transaction $0, when not transaction is running", req.sub_transaction_id())); - return Transaction(PgClientSessionKind::kPlain)->RollbackSubTransaction(req.sub_transaction_id()); + return Transaction(PgClientSessionKind::kPlain)->RollbackToSubTransaction( + req.sub_transaction_id(), context->GetClientDeadline()); } Status PgClientSession::SetActiveSubTransaction( diff --git a/src/yb/tserver/pg_client_session.h b/src/yb/tserver/pg_client_session.h index 7f7d51c6802f..193948fba1c0 100644 --- a/src/yb/tserver/pg_client_session.h +++ b/src/yb/tserver/pg_client_session.h @@ -59,7 +59,7 @@ namespace tserver { (FinishTransaction) \ (InsertSequenceTuple) \ (ReadSequenceTuple) \ - (RollbackSubTransaction) \ + (RollbackToSubTransaction) \ (SetActiveSubTransaction) \ (TruncateTable) \ (UpdateSequenceTuple) \ diff --git a/src/yb/tserver/read_query.cc b/src/yb/tserver/read_query.cc index b67ccef41c82..eaa565378aef 100644 --- a/src/yb/tserver/read_query.cc +++ b/src/yb/tserver/read_query.cc @@ -376,6 +376,10 @@ Status ReadQuery::DoPerform() { write.set_unused_tablet_id(""); // For backward compatibility. write_batch.set_deprecated_may_have_metadata(true); write.set_batch_idx(req_->batch_idx()); + if (req_->has_subtransaction() && req_->subtransaction().has_subtransaction_id()) { + write_batch.mutable_subtransaction()->set_subtransaction_id( + req_->subtransaction().subtransaction_id()); + } // TODO(dtxn) write request id RETURN_NOT_OK(leader_peer.peer->tablet()->CreateReadIntents( diff --git a/src/yb/util/uint_set.h b/src/yb/util/uint_set.h index 9ebeb2c9236c..f8f2009f9501 100644 --- a/src/yb/util/uint_set.h +++ b/src/yb/util/uint_set.h @@ -99,6 +99,10 @@ class UnsignedIntSet { return JoinStrings(parts, ", "); } + bool operator==(const UnsignedIntSet& other) const { + return interval_set_ == other.interval_set_; + } + private: using ElementType = uint32_t; using ElementRange = boost::icl::discrete_interval; diff --git a/src/yb/yql/pggate/pg_client.cc b/src/yb/yql/pggate/pg_client.cc index edcded31af5b..6e9a87ceb7ad 100644 --- a/src/yb/yql/pggate/pg_client.cc +++ b/src/yb/yql/pggate/pg_client.cc @@ -231,14 +231,14 @@ class PgClient::Impl { return ResponseStatus(resp); } - Status RollbackSubTransaction(SubTransactionId id) { - tserver::PgRollbackSubTransactionRequestPB req; + Status RollbackToSubTransaction(SubTransactionId id) { + tserver::PgRollbackToSubTransactionRequestPB req; req.set_session_id(session_id_); req.set_sub_transaction_id(id); - tserver::PgRollbackSubTransactionResponsePB resp; + tserver::PgRollbackToSubTransactionResponsePB resp; - RETURN_NOT_OK(proxy_->RollbackSubTransaction(req, &resp, PrepareController())); + RETURN_NOT_OK(proxy_->RollbackToSubTransaction(req, &resp, PrepareController())); return ResponseStatus(resp); } @@ -618,8 +618,8 @@ Status PgClient::SetActiveSubTransaction( return impl_->SetActiveSubTransaction(id, options); } -Status PgClient::RollbackSubTransaction(SubTransactionId id) { - return impl_->RollbackSubTransaction(id); +Status PgClient::RollbackToSubTransaction(SubTransactionId id) { + return impl_->RollbackToSubTransaction(id); } Status PgClient::ValidatePlacement(const tserver::PgValidatePlacementRequestPB* req) { diff --git a/src/yb/yql/pggate/pg_client.h b/src/yb/yql/pggate/pg_client.h index ca4654c1912e..ab89f4f01cae 100644 --- a/src/yb/yql/pggate/pg_client.h +++ b/src/yb/yql/pggate/pg_client.h @@ -97,7 +97,7 @@ class PgClient { Status SetActiveSubTransaction( SubTransactionId id, tserver::PgPerformOptionsPB* options); - Status RollbackSubTransaction(SubTransactionId id); + Status RollbackToSubTransaction(SubTransactionId id); Status ValidatePlacement(const tserver::PgValidatePlacementRequestPB* req); diff --git a/src/yb/yql/pggate/pg_session.cc b/src/yb/yql/pggate/pg_session.cc index a073a784b623..8e3409430b7d 100644 --- a/src/yb/yql/pggate/pg_session.cc +++ b/src/yb/yql/pggate/pg_session.cc @@ -696,13 +696,15 @@ Status PgSession::SetActiveSubTransaction(SubTransactionId id) { return pg_client_.SetActiveSubTransaction(id, options_ptr); } -Status PgSession::RollbackSubTransaction(SubTransactionId id) { - // TODO(savepoints) -- send async RPC to transaction status tablet, or rely on heartbeater to - // eventually send this metadata. +Status PgSession::RollbackToSubTransaction(SubTransactionId id) { // See comment in SetActiveSubTransaction -- we must flush buffered operations before updating any // SubTransactionMetadata. + // TODO(read committed): performance improvement - + // don't wait for ops which have already been sent ahead by pg_session i.e., to the batcher, then + // rpc layer and beyond. For such ops, rely on aborted sub txn list in status tablet to invalidate + // writes which will be asynchronously written to txn participants. RETURN_NOT_OK(FlushBufferedOperations()); - return pg_client_.RollbackSubTransaction(id); + return pg_client_.RollbackToSubTransaction(id); } void PgSession::ResetHasWriteOperationsInDdlMode() { diff --git a/src/yb/yql/pggate/pg_session.h b/src/yb/yql/pggate/pg_session.h index 6bda84758bcd..143ffc925fb6 100644 --- a/src/yb/yql/pggate/pg_session.h +++ b/src/yb/yql/pggate/pg_session.h @@ -312,7 +312,7 @@ class PgSession : public RefCountedThreadSafe { bool ShouldUseFollowerReads() const; Status SetActiveSubTransaction(SubTransactionId id); - Status RollbackSubTransaction(SubTransactionId id); + Status RollbackToSubTransaction(SubTransactionId id); void ResetHasWriteOperationsInDdlMode(); bool HasWriteOperationsInDdlMode() const; diff --git a/src/yb/yql/pggate/pggate.cc b/src/yb/yql/pggate/pggate.cc index b46ee22e3a0e..8e2b115aa7b9 100644 --- a/src/yb/yql/pggate/pggate.cc +++ b/src/yb/yql/pggate/pggate.cc @@ -1660,9 +1660,9 @@ Status PgApiImpl::SetActiveSubTransaction(SubTransactionId id) { return pg_session_->SetActiveSubTransaction(id); } -Status PgApiImpl::RollbackSubTransaction(SubTransactionId id) { +Status PgApiImpl::RollbackToSubTransaction(SubTransactionId id) { pg_session_->DropBufferedOperations(); - return pg_session_->RollbackSubTransaction(id); + return pg_session_->RollbackToSubTransaction(id); } double PgApiImpl::GetTransactionPriority() const { diff --git a/src/yb/yql/pggate/pggate.h b/src/yb/yql/pggate/pggate.h index e6135f4b472c..2b0e882171cb 100644 --- a/src/yb/yql/pggate/pggate.h +++ b/src/yb/yql/pggate/pggate.h @@ -528,7 +528,7 @@ class PgApiImpl { Status ExitSeparateDdlTxnMode(); void ClearSeparateDdlTxnMode(); Status SetActiveSubTransaction(SubTransactionId id); - Status RollbackSubTransaction(SubTransactionId id); + Status RollbackToSubTransaction(SubTransactionId id); double GetTransactionPriority() const; TxnPriorityRequirement GetTransactionPriorityType() const; diff --git a/src/yb/yql/pggate/ybc_pggate.cc b/src/yb/yql/pggate/ybc_pggate.cc index 308c56340b2f..9ea1f80c772b 100644 --- a/src/yb/yql/pggate/ybc_pggate.cc +++ b/src/yb/yql/pggate/ybc_pggate.cc @@ -1009,8 +1009,8 @@ YBCStatus YBCPgSetActiveSubTransaction(uint32_t id) { return ToYBCStatus(pgapi->SetActiveSubTransaction(id)); } -YBCStatus YBCPgRollbackSubTransaction(uint32_t id) { - return ToYBCStatus(pgapi->RollbackSubTransaction(id)); +YBCStatus YBCPgRollbackToSubTransaction(uint32_t id) { + return ToYBCStatus(pgapi->RollbackToSubTransaction(id)); } //------------------------------------------------------------------------------------------------ diff --git a/src/yb/yql/pggate/ybc_pggate.h b/src/yb/yql/pggate/ybc_pggate.h index dc8268978132..1912b3fb22b3 100644 --- a/src/yb/yql/pggate/ybc_pggate.h +++ b/src/yb/yql/pggate/ybc_pggate.h @@ -473,7 +473,7 @@ bool YBCPgHasWriteOperationsInDdlTxnMode(); YBCStatus YBCPgExitSeparateDdlTxnMode(); void YBCPgClearSeparateDdlTxnMode(); YBCStatus YBCPgSetActiveSubTransaction(uint32_t id); -YBCStatus YBCPgRollbackSubTransaction(uint32_t id); +YBCStatus YBCPgRollbackToSubTransaction(uint32_t id); double YBCGetTransactionPriority(); TxnPriorityRequirement YBCGetTransactionPriorityType(); diff --git a/src/yb/yql/pgwrapper/pg_mini-test.cc b/src/yb/yql/pgwrapper/pg_mini-test.cc index 118845f33cef..321c16e680cc 100644 --- a/src/yb/yql/pgwrapper/pg_mini-test.cc +++ b/src/yb/yql/pgwrapper/pg_mini-test.cc @@ -2575,7 +2575,10 @@ TEST_F( RunManyConcurrentReadersTest(); } -TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(TestSerializableStrongReadLockNotAborted)) { +// TODO(savepoint): This test would start failing until issue #9587 is fixed. It worked earlier but +// is expected to fail, as pointed out in https://phabricator.dev.yugabyte.com/D17177 +// Change macro to YB_DISABLE_TEST_IN_TSAN if re-enabling. +TEST_F(PgMiniTest, YB_DISABLE_TEST(TestSerializableStrongReadLockNotAborted)) { auto conn = ASSERT_RESULT(Connect()); ASSERT_OK(conn.Execute("CREATE TABLE t (a int PRIMARY KEY, b int) SPLIT INTO 1 TABLETS")); for (int i = 0; i < 100; ++i) {