Skip to content

Commit

Permalink
[#14202] YSQL: Avoid rpc from YSQL to local tserver when switching su…
Browse files Browse the repository at this point in the history
…b-txns

Summary:
Currently, YSQL sends the PgSetActiveSubTransactionRequestPB rpc to the local
tserver process' Pg client (pg_client_session.cc) when it wants to switch the
active sub transaction id. YSQL will switch the active sub transaction id for
any of following - user savepoint operations, an internal savepoint created for
exception handling or an internal savepoint created before execution of a
statement in READ COMMITTED isolation level as explained in 9f2cc7f.

The PgSetActiveSubTransactionRequestPB rpc was added in c5f5125 as part
of larger PgClient changes. After the change, YSQL sends all rpcs for DMLs to
the PgClientSession on the local tserver process. Distributed transaction
creations and management is handled in the PgClientSession and is not visible
to the YSQL backends. Before this change, YSQL backends managed the lifecycle
of distributed transactions and sub-txn operations were local i.e., any rpcs.

However, a separate rpc is not required for setting the active sub transaction id.
The active sub transaction id can be piggybacked in each PgPerformRequestPB
through the PgPerformOptionsPB sub-field and the Pg client session on the
tserver can change the active sub transaction id before handling the
PgPerformRequestPB.

Removal of this rpc is required as a precursor to solving #12494. As part of #12494,
we can see that postgres allows changing the transaction isolation level after a "BEGIN"
statement. In YSQL, if the "BEGIN" statement starts in READ COMMITTED isolation,
every future statement will acquire an internal savepoint before executing the
statement (refer 9f2cc7f). Without this diff, the internal savepoint creation
results in a PgSetActiveSubTransactionRequestPB rpc which inturn starts a
distributed transaction. This leads to the setting the isolation level in stone,
and not allowing any future modification of the transaction isolation (via the
"SET TRANSACTION ISOLATION LEVEL" statement) even if no other queries have been
executed. It results in the "Attempt to change isolation level of running
transaction..." error message in pg_client_session.cc

NOTE: The RollbackToSubTransaction rpc for rolling back to a sub transaction id stays
and will remain synchronous.

Test Plan: ./yb_build.sh --java-test org.yb.pgsql.TestPgIsolationRegress#isolationRegres

Reviewers: mtakahara, rsami

Reviewed By: mtakahara, rsami

Subscribers: zyu, yql, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D19902
  • Loading branch information
pkj415 committed Oct 18, 2022
1 parent d132aef commit 43900f9
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 42 deletions.
5 changes: 5 additions & 0 deletions src/yb/tserver/pg_client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ service PgClientService {
rpc ReserveOids(PgReserveOidsRequestPB) returns (PgReserveOidsResponsePB);
rpc RollbackToSubTransaction(PgRollbackToSubTransactionRequestPB)
returns (PgRollbackToSubTransactionResponsePB);
// DEPRECATED
rpc SetActiveSubTransaction(PgSetActiveSubTransactionRequestPB)
returns (PgSetActiveSubTransactionResponsePB);
rpc TabletServerCount(PgTabletServerCountRequestPB) returns (PgTabletServerCountResponsePB);
Expand Down Expand Up @@ -354,6 +355,7 @@ message PgPerformOptionsPB {
bool force_global_transaction = 13;
string namespace_id = 14;
bool use_xcluster_database_consistency = 15;
uint32 active_sub_transaction_id = 16;
}

message PgPerformRequestPB {
Expand Down Expand Up @@ -385,19 +387,22 @@ message PgReserveOidsResponsePB {
message PgRollbackToSubTransactionRequestPB {
uint64 session_id = 1;
uint32 sub_transaction_id = 2;
PgPerformOptionsPB options = 3;
}

message PgRollbackToSubTransactionResponsePB {
AppStatusPB status = 1;
}

// DEPRECATED
message PgSetActiveSubTransactionRequestPB {
uint64 session_id = 1;
uint32 sub_transaction_id = 2;
// Used to create transaction when we did not have one yet.
PgPerformOptionsPB options = 3;
}

// DEPRECATED
message PgSetActiveSubTransactionResponsePB {
AppStatusPB status = 1;
}
Expand Down
90 changes: 69 additions & 21 deletions src/yb/tserver/pg_client_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -537,42 +537,84 @@ Status PgClientSession::RollbackToSubTransaction(
const PgRollbackToSubTransactionRequestPB& req, PgRollbackToSubTransactionResponsePB* resp,
rpc::RpcContext* context) {
VLOG_WITH_PREFIX_AND_FUNC(2) << req.ShortDebugString();
DCHECK_GE(req.sub_transaction_id(), 0);

/*
* Currently we do not support a transaction block that has both DDL and DML statements (we
* support it syntactically but not semantically). Thus, when a DDL is encountered in a
* transaction block, a separate transaction is created for the DDL statement, which is
* committed at the end of that statement. This is why there are 2 session objects here, one
* corresponds to the DML transaction, and the other to a possible separate transaction object
* created for the DDL. However, subtransaction-id increases across both sessions. Also,
* it is not possible to rollback to a savepoint created for the DML transaction from the DDL
* statement, i.e. the following sequence of events is not possible:
* created for the DDL. However, subtransaction-id increases across both sessions in YSQL.
*
* Rolling back to a savepoint from either the DDL or DML txn will only revert any writes/ lock
* acquisitions done as part of that txn. Consider the below example, the "Rollback to
* Savepoint 1" will only revert things done in the DDL's context and not the commands that follow
* Savepoint 1 in the DML's context.
*
* -- Start DML
* ---- Commands...
* ---- Savepoint 1
* ---- Commands...
* ---- Start DDL
* ------ Commands
* ------ Commands...
* ------ Savepoint 2
* ------ Commands
* ------ Rollback to Savepoint 1 -- Not possible, can only rollback to Savepoint 1.
* ---- DDL committed at this point
* ---- Rollback to Savepoint 2 -- Again not possible, the DDL is already committed.
* The above is not possible because we do not allow multiple statements in a single DDL txn.
* Because of the above properties, when we need to rollback to a savepoint, the subtransaction-id
* can only have been part of one session.
* ------ Commands...
* ------ Rollback to Savepoint 1
*/
for (auto& session : sessions_) {
auto transaction = session.transaction;
if (transaction
&& transaction->HasSubTransaction(req.sub_transaction_id())) {
return transaction->RollbackToSubTransaction(req.sub_transaction_id(),
context->GetClientDeadline());
}
auto kind = PgClientSessionKind::kPlain;

if (req.has_options() && req.options().ddl_mode())
kind = PgClientSessionKind::kDdl;

auto transaction = Transaction(kind);

if (!transaction) {
LOG_WITH_PREFIX_AND_FUNC(WARNING)
<< "RollbackToSubTransaction " << req.sub_transaction_id()
<< " when no distributed transaction of kind"
<< (kind == PgClientSessionKind::kPlain ? "kPlain" : "kDdl")
<< " is running. This can happen if no distributed transaction has been started yet"
<< " e.g., BEGIN; SAVEPOINT a; ROLLBACK TO a;";
return Status::OK();
}

// Before rolling back to req.sub_transaction_id(), set the active sub transaction id to be the
// same as that in the request. This is necessary because of the following reasoning:
//
// ROLLBACK TO SAVEPOINT leads to many calls to YBCRollbackToSubTransaction(), not just 1:
// Assume the current sub-txns are from 1 to 10 and then a ROLLBACK TO X is performed where
// X corresponds to sub-txn 5. In this case, 6 calls are made to
// YBCRollbackToSubTransaction() with sub-txn ids: 5, 10, 9, 8, 7, 6, 5. The first call is
// made in RollbackToSavepoint() but the latter 5 are redundant and called from the
// AbortSubTransaction() handling for each sub-txn.
//
// Now consider the following scenario:
// 1. In READ COMMITTED isolation, a new internal sub transaction is created at the start of
// each statement (even a ROLLBACK TO). So, a ROLLBACK TO X like above, will first create a
// new internal sub-txn 11.
// 2. YBCRollbackToSubTransaction() will be called 7 times on sub-txn ids:
// 5, 11, 10, 9, 8, 7, 6
//
// So, it is neccessary to first bump the active-sub txn id to 11 and then perform the rollback.
// Otherwise, an error will be thrown that the sub-txn doesn't exist when
// YBCRollbackToSubTransaction() is called for sub-txn id 11.

if (req.has_options()) {
DCHECK_GE(req.options().active_sub_transaction_id(), 0);
transaction->SetActiveSubTransaction(req.options().active_sub_transaction_id());
}
return STATUS(IllegalState,
Format("Rollback sub transaction $0, when no transaction is running",
req.sub_transaction_id()));

RSTATUS_DCHECK(transaction->HasSubTransaction(req.sub_transaction_id()), InvalidArgument,
Format("Transaction of kind $0 doesn't have sub transaction $1",
kind == PgClientSessionKind::kPlain ? "kPlain" : "kDdl",
req.sub_transaction_id()));

return transaction->RollbackToSubTransaction(req.sub_transaction_id(),
context->GetClientDeadline());
}

// The below RPC is DEPRECATED.
Status PgClientSession::SetActiveSubTransaction(
const PgSetActiveSubTransactionRequestPB& req, PgSetActiveSubTransactionResponsePB* resp,
rpc::RpcContext* context) {
Expand All @@ -594,6 +636,7 @@ Status PgClientSession::SetActiveSubTransaction(
Format("Set active sub transaction $0, when no transaction is running",
req.sub_transaction_id()));

DCHECK_GE(req.sub_transaction_id(), 0);
transaction->SetActiveSubTransaction(req.sub_transaction_id());
return Status::OK();
}
Expand Down Expand Up @@ -786,6 +829,11 @@ PgClientSession::SetupSession(const PgPerformRequestPB& req, CoarseTimePoint dea

session->SetDeadline(deadline);

if (transaction) {
DCHECK_GE(options.active_sub_transaction_id(), 0);
transaction->SetActiveSubTransaction(options.active_sub_transaction_id());
}

return std::make_pair(session, used_read_time);
}

Expand Down
10 changes: 7 additions & 3 deletions src/yb/yql/pggate/pg_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,12 @@ class PgClient::Impl {
return ResponseStatus(resp);
}

Status RollbackToSubTransaction(SubTransactionId id) {
Status RollbackToSubTransaction(SubTransactionId id, tserver::PgPerformOptionsPB* options) {
tserver::PgRollbackToSubTransactionRequestPB req;
req.set_session_id(session_id_);
if (options) {
options->Swap(req.mutable_options());
}
req.set_sub_transaction_id(id);

tserver::PgRollbackToSubTransactionResponsePB resp;
Expand Down Expand Up @@ -658,8 +661,9 @@ Status PgClient::SetActiveSubTransaction(
return impl_->SetActiveSubTransaction(id, options);
}

Status PgClient::RollbackToSubTransaction(SubTransactionId id) {
return impl_->RollbackToSubTransaction(id);
Status PgClient::RollbackToSubTransaction(
SubTransactionId id, tserver::PgPerformOptionsPB* options) {
return impl_->RollbackToSubTransaction(id, options);
}

Status PgClient::ValidatePlacement(const tserver::PgValidatePlacementRequestPB* req) {
Expand Down
2 changes: 1 addition & 1 deletion src/yb/yql/pggate/pg_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class PgClient {

Status SetActiveSubTransaction(
SubTransactionId id, tserver::PgPerformOptionsPB* options);
Status RollbackToSubTransaction(SubTransactionId id);
Status RollbackToSubTransaction(SubTransactionId id, tserver::PgPerformOptionsPB* options);

Status ValidatePlacement(const tserver::PgValidatePlacementRequestPB* req);

Expand Down
27 changes: 10 additions & 17 deletions src/yb/yql/pggate/pg_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,9 @@ class PgSession::RunHelper {

const auto row_mark_type = GetRowMarkType(*op);
const auto txn_priority_requirement =
pg_session_.GetIsolationLevel() == PgIsolationLevel::READ_COMMITTED ||
RowMarkNeedsHigherPriority(row_mark_type)
? kHigherPriorityRange : kLowerPriorityRange;
pg_session_.GetIsolationLevel() == PgIsolationLevel::READ_COMMITTED
? kHighestPriority :
(RowMarkNeedsHigherPriority(row_mark_type) ? kHigherPriorityRange : kLowerPriorityRange);
read_only = read_only && !IsValidRowMarkType(row_mark_type);

return pg_session_.pg_txn_manager_->CalculateIsolation(
Expand Down Expand Up @@ -721,20 +721,11 @@ Status PgSession::SetActiveSubTransaction(SubTransactionId id) {
// ensuring that previous operations use previous SubTransactionMetadata. If we do not flush here,
// already queued operations may incorrectly use this newly modified SubTransactionMetadata when
// they are eventually sent to DocDB.
VLOG(4) << "SetActiveSubTransactionId " << id;
RETURN_NOT_OK(FlushBufferedOperations());
tserver::PgPerformOptionsPB* options_ptr = nullptr;
tserver::PgPerformOptionsPB options;
if (pg_txn_manager_->GetIsolationLevel() == IsolationLevel::NON_TRANSACTIONAL) {
auto txn_priority_requirement = kLowerPriorityRange;
if (pg_txn_manager_->GetPgIsolationLevel() == PgIsolationLevel::READ_COMMITTED) {
txn_priority_requirement = kHighestPriority;
}
RETURN_NOT_OK(pg_txn_manager_->CalculateIsolation(
false /* read_only_op */, txn_priority_requirement));
options_ptr = &options;
pg_txn_manager_->SetupPerformOptions(&options);
}
return pg_client_.SetActiveSubTransaction(id, options_ptr);
pg_txn_manager_->SetActiveSubTransactionId(id);

return Status::OK();
}

Status PgSession::RollbackToSubTransaction(SubTransactionId id) {
Expand All @@ -745,7 +736,9 @@ Status PgSession::RollbackToSubTransaction(SubTransactionId id) {
// rpc layer and beyond. For such ops, rely on aborted sub txn list in status tablet to invalidate
// writes which will be asynchronously written to txn participants.
RETURN_NOT_OK(FlushBufferedOperations());
return pg_client_.RollbackToSubTransaction(id);
tserver::PgPerformOptionsPB options;
pg_txn_manager_->SetupPerformOptions(&options);
return pg_client_.RollbackToSubTransaction(id, &options);
}

void PgSession::ResetHasWriteOperationsInDdlMode() {
Expand Down
8 changes: 8 additions & 0 deletions src/yb/yql/pggate/pg_txn_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,10 @@ Status PgTxnManager::RestartReadPoint() {
return Status::OK();
}
void PgTxnManager::SetActiveSubTransactionId(SubTransactionId id) {
active_sub_transaction_id_ = id;
}
Status PgTxnManager::CommitTransaction() {
return FinishTransaction(Commit::kTrue);
}
Expand Down Expand Up @@ -369,6 +373,7 @@ void PgTxnManager::ResetTxnAndSession() {
priority_ = 0;
in_txn_limit_ = HybridTime();
++txn_serial_no_;
active_sub_transaction_id_ = 0;
enable_follower_reads_ = false;
read_only_ = false;
Expand Down Expand Up @@ -409,10 +414,13 @@ std::string PgTxnManager::TxnStateDebugStr() const {
uint64_t PgTxnManager::SetupPerformOptions(tserver::PgPerformOptionsPB* options) {
if (!ddl_mode_ && !txn_in_progress_) {
++txn_serial_no_;
active_sub_transaction_id_ = 0;
}
options->set_isolation(isolation_level_);
options->set_ddl_mode(ddl_mode_);
options->set_txn_serial_no(txn_serial_no_);
options->set_active_sub_transaction_id(active_sub_transaction_id_);
if (txn_in_progress_ && in_txn_limit_) {
options->set_in_txn_limit_ht(in_txn_limit_.ToUint64());
}
Expand Down
2 changes: 2 additions & 0 deletions src/yb/yql/pggate/pg_txn_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class PgTxnManager : public RefCountedThreadSafe<PgTxnManager> {
Status RestartTransaction();
Status ResetTransactionReadPoint();
Status RestartReadPoint();
void SetActiveSubTransactionId(SubTransactionId id);
Status CommitTransaction();
Status AbortTransaction();
Status SetPgIsolationLevel(int isolation);
Expand Down Expand Up @@ -106,6 +107,7 @@ class PgTxnManager : public RefCountedThreadSafe<PgTxnManager> {
bool txn_in_progress_ = false;
IsolationLevel isolation_level_ = IsolationLevel::NON_TRANSACTIONAL;
uint64_t txn_serial_no_ = 0;
SubTransactionId active_sub_transaction_id_ = 0;
bool need_restart_ = false;
bool need_defer_read_point_ = false;
tserver::ReadTimeManipulation read_time_manipulation_ = tserver::ReadTimeManipulation::NONE;
Expand Down

0 comments on commit 43900f9

Please sign in to comment.