From 43900f94b7d699a62ec914a1f6cb16244d872bb2 Mon Sep 17 00:00:00 2001 From: Piyush Jain Date: Thu, 22 Sep 2022 12:41:42 -0700 Subject: [PATCH] [#14202] YSQL: Avoid rpc from YSQL to local tserver when switching sub-txns Summary: Currently, YSQL sends the PgSetActiveSubTransactionRequestPB rpc to the local tserver process' Pg client (pg_client_session.cc) when it wants to switch the active sub transaction id. YSQL will switch the active sub transaction id for any of following - user savepoint operations, an internal savepoint created for exception handling or an internal savepoint created before execution of a statement in READ COMMITTED isolation level as explained in 9f2cc7f. The PgSetActiveSubTransactionRequestPB rpc was added in c5f51258 as part of larger PgClient changes. After the change, YSQL sends all rpcs for DMLs to the PgClientSession on the local tserver process. Distributed transaction creations and management is handled in the PgClientSession and is not visible to the YSQL backends. Before this change, YSQL backends managed the lifecycle of distributed transactions and sub-txn operations were local i.e., any rpcs. However, a separate rpc is not required for setting the active sub transaction id. The active sub transaction id can be piggybacked in each PgPerformRequestPB through the PgPerformOptionsPB sub-field and the Pg client session on the tserver can change the active sub transaction id before handling the PgPerformRequestPB. Removal of this rpc is required as a precursor to solving #12494. As part of #12494, we can see that postgres allows changing the transaction isolation level after a "BEGIN" statement. In YSQL, if the "BEGIN" statement starts in READ COMMITTED isolation, every future statement will acquire an internal savepoint before executing the statement (refer 9f2cc7fe). Without this diff, the internal savepoint creation results in a PgSetActiveSubTransactionRequestPB rpc which inturn starts a distributed transaction. This leads to the setting the isolation level in stone, and not allowing any future modification of the transaction isolation (via the "SET TRANSACTION ISOLATION LEVEL" statement) even if no other queries have been executed. It results in the "Attempt to change isolation level of running transaction..." error message in pg_client_session.cc NOTE: The RollbackToSubTransaction rpc for rolling back to a sub transaction id stays and will remain synchronous. Test Plan: ./yb_build.sh --java-test org.yb.pgsql.TestPgIsolationRegress#isolationRegres Reviewers: mtakahara, rsami Reviewed By: mtakahara, rsami Subscribers: zyu, yql, bogdan Differential Revision: https://phabricator.dev.yugabyte.com/D19902 --- src/yb/tserver/pg_client.proto | 5 ++ src/yb/tserver/pg_client_session.cc | 90 ++++++++++++++++++++++------- src/yb/yql/pggate/pg_client.cc | 10 +++- src/yb/yql/pggate/pg_client.h | 2 +- src/yb/yql/pggate/pg_session.cc | 27 ++++----- src/yb/yql/pggate/pg_txn_manager.cc | 8 +++ src/yb/yql/pggate/pg_txn_manager.h | 2 + 7 files changed, 102 insertions(+), 42 deletions(-) diff --git a/src/yb/tserver/pg_client.proto b/src/yb/tserver/pg_client.proto index a0e3f7c9aeee..9a40144def47 100644 --- a/src/yb/tserver/pg_client.proto +++ b/src/yb/tserver/pg_client.proto @@ -53,6 +53,7 @@ service PgClientService { rpc ReserveOids(PgReserveOidsRequestPB) returns (PgReserveOidsResponsePB); rpc RollbackToSubTransaction(PgRollbackToSubTransactionRequestPB) returns (PgRollbackToSubTransactionResponsePB); + // DEPRECATED rpc SetActiveSubTransaction(PgSetActiveSubTransactionRequestPB) returns (PgSetActiveSubTransactionResponsePB); rpc TabletServerCount(PgTabletServerCountRequestPB) returns (PgTabletServerCountResponsePB); @@ -354,6 +355,7 @@ message PgPerformOptionsPB { bool force_global_transaction = 13; string namespace_id = 14; bool use_xcluster_database_consistency = 15; + uint32 active_sub_transaction_id = 16; } message PgPerformRequestPB { @@ -385,12 +387,14 @@ message PgReserveOidsResponsePB { message PgRollbackToSubTransactionRequestPB { uint64 session_id = 1; uint32 sub_transaction_id = 2; + PgPerformOptionsPB options = 3; } message PgRollbackToSubTransactionResponsePB { AppStatusPB status = 1; } +// DEPRECATED message PgSetActiveSubTransactionRequestPB { uint64 session_id = 1; uint32 sub_transaction_id = 2; @@ -398,6 +402,7 @@ message PgSetActiveSubTransactionRequestPB { PgPerformOptionsPB options = 3; } +// DEPRECATED message PgSetActiveSubTransactionResponsePB { AppStatusPB status = 1; } diff --git a/src/yb/tserver/pg_client_session.cc b/src/yb/tserver/pg_client_session.cc index 6ac901ff4351..5a13b0fcf212 100644 --- a/src/yb/tserver/pg_client_session.cc +++ b/src/yb/tserver/pg_client_session.cc @@ -537,42 +537,84 @@ Status PgClientSession::RollbackToSubTransaction( const PgRollbackToSubTransactionRequestPB& req, PgRollbackToSubTransactionResponsePB* resp, rpc::RpcContext* context) { VLOG_WITH_PREFIX_AND_FUNC(2) << req.ShortDebugString(); + DCHECK_GE(req.sub_transaction_id(), 0); + /* * Currently we do not support a transaction block that has both DDL and DML statements (we * support it syntactically but not semantically). Thus, when a DDL is encountered in a * transaction block, a separate transaction is created for the DDL statement, which is * committed at the end of that statement. This is why there are 2 session objects here, one * corresponds to the DML transaction, and the other to a possible separate transaction object - * created for the DDL. However, subtransaction-id increases across both sessions. Also, - * it is not possible to rollback to a savepoint created for the DML transaction from the DDL - * statement, i.e. the following sequence of events is not possible: + * created for the DDL. However, subtransaction-id increases across both sessions in YSQL. + * + * Rolling back to a savepoint from either the DDL or DML txn will only revert any writes/ lock + * acquisitions done as part of that txn. Consider the below example, the "Rollback to + * Savepoint 1" will only revert things done in the DDL's context and not the commands that follow + * Savepoint 1 in the DML's context. + * * -- Start DML * ---- Commands... * ---- Savepoint 1 + * ---- Commands... * ---- Start DDL - * ------ Commands + * ------ Commands... * ------ Savepoint 2 - * ------ Commands - * ------ Rollback to Savepoint 1 -- Not possible, can only rollback to Savepoint 1. - * ---- DDL committed at this point - * ---- Rollback to Savepoint 2 -- Again not possible, the DDL is already committed. - * The above is not possible because we do not allow multiple statements in a single DDL txn. - * Because of the above properties, when we need to rollback to a savepoint, the subtransaction-id - * can only have been part of one session. + * ------ Commands... + * ------ Rollback to Savepoint 1 */ - for (auto& session : sessions_) { - auto transaction = session.transaction; - if (transaction - && transaction->HasSubTransaction(req.sub_transaction_id())) { - return transaction->RollbackToSubTransaction(req.sub_transaction_id(), - context->GetClientDeadline()); - } + auto kind = PgClientSessionKind::kPlain; + + if (req.has_options() && req.options().ddl_mode()) + kind = PgClientSessionKind::kDdl; + + auto transaction = Transaction(kind); + + if (!transaction) { + LOG_WITH_PREFIX_AND_FUNC(WARNING) + << "RollbackToSubTransaction " << req.sub_transaction_id() + << " when no distributed transaction of kind" + << (kind == PgClientSessionKind::kPlain ? "kPlain" : "kDdl") + << " is running. This can happen if no distributed transaction has been started yet" + << " e.g., BEGIN; SAVEPOINT a; ROLLBACK TO a;"; + return Status::OK(); + } + + // Before rolling back to req.sub_transaction_id(), set the active sub transaction id to be the + // same as that in the request. This is necessary because of the following reasoning: + // + // ROLLBACK TO SAVEPOINT leads to many calls to YBCRollbackToSubTransaction(), not just 1: + // Assume the current sub-txns are from 1 to 10 and then a ROLLBACK TO X is performed where + // X corresponds to sub-txn 5. In this case, 6 calls are made to + // YBCRollbackToSubTransaction() with sub-txn ids: 5, 10, 9, 8, 7, 6, 5. The first call is + // made in RollbackToSavepoint() but the latter 5 are redundant and called from the + // AbortSubTransaction() handling for each sub-txn. + // + // Now consider the following scenario: + // 1. In READ COMMITTED isolation, a new internal sub transaction is created at the start of + // each statement (even a ROLLBACK TO). So, a ROLLBACK TO X like above, will first create a + // new internal sub-txn 11. + // 2. YBCRollbackToSubTransaction() will be called 7 times on sub-txn ids: + // 5, 11, 10, 9, 8, 7, 6 + // + // So, it is neccessary to first bump the active-sub txn id to 11 and then perform the rollback. + // Otherwise, an error will be thrown that the sub-txn doesn't exist when + // YBCRollbackToSubTransaction() is called for sub-txn id 11. + + if (req.has_options()) { + DCHECK_GE(req.options().active_sub_transaction_id(), 0); + transaction->SetActiveSubTransaction(req.options().active_sub_transaction_id()); } - return STATUS(IllegalState, - Format("Rollback sub transaction $0, when no transaction is running", - req.sub_transaction_id())); + + RSTATUS_DCHECK(transaction->HasSubTransaction(req.sub_transaction_id()), InvalidArgument, + Format("Transaction of kind $0 doesn't have sub transaction $1", + kind == PgClientSessionKind::kPlain ? "kPlain" : "kDdl", + req.sub_transaction_id())); + + return transaction->RollbackToSubTransaction(req.sub_transaction_id(), + context->GetClientDeadline()); } +// The below RPC is DEPRECATED. Status PgClientSession::SetActiveSubTransaction( const PgSetActiveSubTransactionRequestPB& req, PgSetActiveSubTransactionResponsePB* resp, rpc::RpcContext* context) { @@ -594,6 +636,7 @@ Status PgClientSession::SetActiveSubTransaction( Format("Set active sub transaction $0, when no transaction is running", req.sub_transaction_id())); + DCHECK_GE(req.sub_transaction_id(), 0); transaction->SetActiveSubTransaction(req.sub_transaction_id()); return Status::OK(); } @@ -786,6 +829,11 @@ PgClientSession::SetupSession(const PgPerformRequestPB& req, CoarseTimePoint dea session->SetDeadline(deadline); + if (transaction) { + DCHECK_GE(options.active_sub_transaction_id(), 0); + transaction->SetActiveSubTransaction(options.active_sub_transaction_id()); + } + return std::make_pair(session, used_read_time); } diff --git a/src/yb/yql/pggate/pg_client.cc b/src/yb/yql/pggate/pg_client.cc index 935f86729826..5ecf72ec5e5c 100644 --- a/src/yb/yql/pggate/pg_client.cc +++ b/src/yb/yql/pggate/pg_client.cc @@ -246,9 +246,12 @@ class PgClient::Impl { return ResponseStatus(resp); } - Status RollbackToSubTransaction(SubTransactionId id) { + Status RollbackToSubTransaction(SubTransactionId id, tserver::PgPerformOptionsPB* options) { tserver::PgRollbackToSubTransactionRequestPB req; req.set_session_id(session_id_); + if (options) { + options->Swap(req.mutable_options()); + } req.set_sub_transaction_id(id); tserver::PgRollbackToSubTransactionResponsePB resp; @@ -658,8 +661,9 @@ Status PgClient::SetActiveSubTransaction( return impl_->SetActiveSubTransaction(id, options); } -Status PgClient::RollbackToSubTransaction(SubTransactionId id) { - return impl_->RollbackToSubTransaction(id); +Status PgClient::RollbackToSubTransaction( + SubTransactionId id, tserver::PgPerformOptionsPB* options) { + return impl_->RollbackToSubTransaction(id, options); } Status PgClient::ValidatePlacement(const tserver::PgValidatePlacementRequestPB* req) { diff --git a/src/yb/yql/pggate/pg_client.h b/src/yb/yql/pggate/pg_client.h index 4bdf4372bff4..ddfbea6029fe 100644 --- a/src/yb/yql/pggate/pg_client.h +++ b/src/yb/yql/pggate/pg_client.h @@ -98,7 +98,7 @@ class PgClient { Status SetActiveSubTransaction( SubTransactionId id, tserver::PgPerformOptionsPB* options); - Status RollbackToSubTransaction(SubTransactionId id); + Status RollbackToSubTransaction(SubTransactionId id, tserver::PgPerformOptionsPB* options); Status ValidatePlacement(const tserver::PgValidatePlacementRequestPB* req); diff --git a/src/yb/yql/pggate/pg_session.cc b/src/yb/yql/pggate/pg_session.cc index b8e94bcb03e6..e8ebcdf7d28c 100644 --- a/src/yb/yql/pggate/pg_session.cc +++ b/src/yb/yql/pggate/pg_session.cc @@ -223,9 +223,9 @@ class PgSession::RunHelper { const auto row_mark_type = GetRowMarkType(*op); const auto txn_priority_requirement = - pg_session_.GetIsolationLevel() == PgIsolationLevel::READ_COMMITTED || - RowMarkNeedsHigherPriority(row_mark_type) - ? kHigherPriorityRange : kLowerPriorityRange; + pg_session_.GetIsolationLevel() == PgIsolationLevel::READ_COMMITTED + ? kHighestPriority : + (RowMarkNeedsHigherPriority(row_mark_type) ? kHigherPriorityRange : kLowerPriorityRange); read_only = read_only && !IsValidRowMarkType(row_mark_type); return pg_session_.pg_txn_manager_->CalculateIsolation( @@ -721,20 +721,11 @@ Status PgSession::SetActiveSubTransaction(SubTransactionId id) { // ensuring that previous operations use previous SubTransactionMetadata. If we do not flush here, // already queued operations may incorrectly use this newly modified SubTransactionMetadata when // they are eventually sent to DocDB. + VLOG(4) << "SetActiveSubTransactionId " << id; RETURN_NOT_OK(FlushBufferedOperations()); - tserver::PgPerformOptionsPB* options_ptr = nullptr; - tserver::PgPerformOptionsPB options; - if (pg_txn_manager_->GetIsolationLevel() == IsolationLevel::NON_TRANSACTIONAL) { - auto txn_priority_requirement = kLowerPriorityRange; - if (pg_txn_manager_->GetPgIsolationLevel() == PgIsolationLevel::READ_COMMITTED) { - txn_priority_requirement = kHighestPriority; - } - RETURN_NOT_OK(pg_txn_manager_->CalculateIsolation( - false /* read_only_op */, txn_priority_requirement)); - options_ptr = &options; - pg_txn_manager_->SetupPerformOptions(&options); - } - return pg_client_.SetActiveSubTransaction(id, options_ptr); + pg_txn_manager_->SetActiveSubTransactionId(id); + + return Status::OK(); } Status PgSession::RollbackToSubTransaction(SubTransactionId id) { @@ -745,7 +736,9 @@ Status PgSession::RollbackToSubTransaction(SubTransactionId id) { // rpc layer and beyond. For such ops, rely on aborted sub txn list in status tablet to invalidate // writes which will be asynchronously written to txn participants. RETURN_NOT_OK(FlushBufferedOperations()); - return pg_client_.RollbackToSubTransaction(id); + tserver::PgPerformOptionsPB options; + pg_txn_manager_->SetupPerformOptions(&options); + return pg_client_.RollbackToSubTransaction(id, &options); } void PgSession::ResetHasWriteOperationsInDdlMode() { diff --git a/src/yb/yql/pggate/pg_txn_manager.cc b/src/yb/yql/pggate/pg_txn_manager.cc index dd212ea30892..21f39ae4ea63 100644 --- a/src/yb/yql/pggate/pg_txn_manager.cc +++ b/src/yb/yql/pggate/pg_txn_manager.cc @@ -329,6 +329,10 @@ Status PgTxnManager::RestartReadPoint() { return Status::OK(); } +void PgTxnManager::SetActiveSubTransactionId(SubTransactionId id) { + active_sub_transaction_id_ = id; +} + Status PgTxnManager::CommitTransaction() { return FinishTransaction(Commit::kTrue); } @@ -369,6 +373,7 @@ void PgTxnManager::ResetTxnAndSession() { priority_ = 0; in_txn_limit_ = HybridTime(); ++txn_serial_no_; + active_sub_transaction_id_ = 0; enable_follower_reads_ = false; read_only_ = false; @@ -409,10 +414,13 @@ std::string PgTxnManager::TxnStateDebugStr() const { uint64_t PgTxnManager::SetupPerformOptions(tserver::PgPerformOptionsPB* options) { if (!ddl_mode_ && !txn_in_progress_) { ++txn_serial_no_; + active_sub_transaction_id_ = 0; } options->set_isolation(isolation_level_); options->set_ddl_mode(ddl_mode_); options->set_txn_serial_no(txn_serial_no_); + options->set_active_sub_transaction_id(active_sub_transaction_id_); + if (txn_in_progress_ && in_txn_limit_) { options->set_in_txn_limit_ht(in_txn_limit_.ToUint64()); } diff --git a/src/yb/yql/pggate/pg_txn_manager.h b/src/yb/yql/pggate/pg_txn_manager.h index 6980bf580194..ad7af5382c25 100644 --- a/src/yb/yql/pggate/pg_txn_manager.h +++ b/src/yb/yql/pggate/pg_txn_manager.h @@ -62,6 +62,7 @@ class PgTxnManager : public RefCountedThreadSafe { Status RestartTransaction(); Status ResetTransactionReadPoint(); Status RestartReadPoint(); + void SetActiveSubTransactionId(SubTransactionId id); Status CommitTransaction(); Status AbortTransaction(); Status SetPgIsolationLevel(int isolation); @@ -106,6 +107,7 @@ class PgTxnManager : public RefCountedThreadSafe { bool txn_in_progress_ = false; IsolationLevel isolation_level_ = IsolationLevel::NON_TRANSACTIONAL; uint64_t txn_serial_no_ = 0; + SubTransactionId active_sub_transaction_id_ = 0; bool need_restart_ = false; bool need_defer_read_point_ = false; tserver::ReadTimeManipulation read_time_manipulation_ = tserver::ReadTimeManipulation::NONE;