From 8835eb1d626053fecd246ccf799c903f5e18494b Mon Sep 17 00:00:00 2001 From: Piyush Jain Date: Tue, 15 Mar 2022 12:34:51 -0700 Subject: [PATCH] [BACKPORT 2.12] [#11760] YSQL: Support NOWAIT for READ COMMITTED isolation level Summary: The NOWAIT clause with a SELECT (that is attempting to take explicit lock(s)) ensures that if the statement sees another txn holding a conflicting lock mode, it will error out immediately with an error message like follows - "could not obtain lock on row in relation". The NOWAIT clause currently has no effect for any isolation level in YSQL. In READ COMMITTED isolation level - the statement would face a kConflict error and the statement retry mechanism of this isolation would retry indefinitely till the lock(s) are acquired (in yb_attempt_to_restart_on_error()). In REPEATABLE READ and SERIALIZABLE isolation level - the statement would either abort the conflicting txn(s) or would abort itself and throw a kConflict error. The former would occur if the statement's txn has higher priority than all conflicting txn(s). The diff fixes the semantics for READ COMMITTED isolation level as a first step since that is easier to do. All txns in READ COMMITTED isolation internally use the same priority and so the txn always faces a kConflict error. We just have to skip retrying the statement for such errors if the NOWAIT clause is specified. Original commit: 7c705d27927f3eaff0d149f5760f5a9d856ef9cd / D16007 Test Plan: Jenkins: rebase: 2.12 ./yb_build.sh --java-test org.yb.pgsql.TestPgExplicitLocks#testNoWait Reviewers: dmitry Reviewed By: dmitry Subscribers: yql Differential Revision: https://phabricator.dev.yugabyte.com/D16417 --- .../org/yb/pgsql/TestPgExplicitLocks.java | 95 +++++++++++++++++++ .../src/backend/executor/nodeLockRows.c | 14 +++ 2 files changed, 109 insertions(+) diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgExplicitLocks.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgExplicitLocks.java index a15a7e3c205d..3cf535d3da4b 100644 --- a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgExplicitLocks.java +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgExplicitLocks.java @@ -13,6 +13,7 @@ package org.yb.pgsql; +import org.apache.commons.lang3.StringUtils; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; @@ -23,6 +24,7 @@ import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; +import java.util.Map; import com.yugabyte.util.PSQLException; import static org.yb.AssertionWrappers.*; @@ -31,6 +33,13 @@ public class TestPgExplicitLocks extends BasePgSQLTest { private static final Logger LOG = LoggerFactory.getLogger(TestPgSelect.class); + @Override + protected Map getTServerFlags() { + Map flagMap = super.getTServerFlags(); + flagMap.put("yb_enable_read_committed_isolation", "true"); + return flagMap; + } + @Test public void testExplicitLocks() throws Exception { setupSimpleTable("explicitlocks"); @@ -374,4 +383,90 @@ public void testLocksSerializableIsolation() throws Exception { public void testLocksSnapshotIsolation() throws Exception { testLocksIsolationLevel(IsolationLevel.REPEATABLE_READ); } + + @Test + public void testNoWait() throws Exception { + ConnectionBuilder builder = getConnectionBuilder(); + try (Connection conn1 = builder.connect(); + Connection conn2 = builder.connect(); + Statement stmt1 = conn1.createStatement(); + Statement stmt2 = conn2.createStatement(); + Connection extraConn = builder.connect(); + Statement extraStmt = extraConn.createStatement()) { + extraStmt.execute("CREATE TABLE test (k INT PRIMARY KEY, v INT)"); + extraStmt.execute("INSERT INTO test VALUES (1, 1)"); + + // The below SELECT is done so that catalog reads are done before the NOWAIT statement. This + // helps us accurately measure the number of read rpcs performed during the NOWAIT query for a + // later assertion. + // + // The sleep is added to ensure that the cache refresh is complete before we measure + // the number of read rpcs. + stmt2.execute("SELECT * FROM test"); + Thread.sleep(2000); + + // Case 1: for REPEATABLE READ (not fully supported yet as explained below). + + // This test uses 2 txns which can be assigned random priorities. Txn1 does just a SELECT FOR + // UPDATE. Txn2 later does the same but with the NOWAIT clause. There are 2 possible outcomes + // based on whether txn2 is assigned higher or lower priority than txn1: + // 1. Txn2 has higher priority: txn1 is aborted. + // 2. Txn2 has lower priority: txn2 is aborted. + // + // TODO(Piyush): The semantics of NOWAIT require that txn2 is aborted always. The statement in + // with NOWAIT should not kill other txns. So, the semantics of case 1 above need to be fixed. + // + // Since only case (2) works as of now, we need to ensure that txn2 has lower priority. + stmt1.execute("SET yb_transaction_priority_lower_bound = 0.5"); + stmt2.execute("SET yb_transaction_priority_upper_bound = 0.4"); + + stmt1.execute("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"); + stmt2.execute("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"); + stmt1.execute("SELECT * FROM test WHERE k=1 FOR UPDATE"); + + Long read_count_before = getTServerMetric( + "handler_latency_yb_tserver_TabletServerService_Read").count; + LOG.info("read_count_before=" + read_count_before); + try { + stmt2.execute("SELECT * FROM test WHERE k=1 FOR UPDATE NOWAIT"); + assertTrue("Should not reach here since the statement is supposed to fail", false); + } catch (SQLException e) { + // If txn2 had a lower priority than txn1, instead of attempting retries for + // ysql_max_write_restart_attempts, it would fail immediately due to the NOWAIT clause + // with the appropriate message. + assertTrue(StringUtils.containsIgnoreCase(e.getMessage(), + "ERROR: could not obtain lock on row in relation \"test\"")); + + // Assert that we failed immediately without retrying at all. This is done by ensuring that + // we make only 2 read rpc call to tservers - one for reading the tuple and one for locking + // the row. + Long read_count_after = getTServerMetric( + "handler_latency_yb_tserver_TabletServerService_Read").count; + LOG.info("read_count_after=" + read_count_after); + assertTrue((read_count_after - read_count_before) == 2); + stmt1.execute("COMMIT"); + stmt2.execute("ROLLBACK"); + } + + // Case 2: for READ COMMITTED isolation. + // All txns use the same priority in this isolation level. + stmt1.execute("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED"); + stmt2.execute("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED"); + stmt1.execute("SELECT * FROM test WHERE k=1 FOR UPDATE"); + + read_count_before = getTServerMetric( + "handler_latency_yb_tserver_TabletServerService_Read").count; + LOG.info("read_count_before=" + read_count_before); + runInvalidQuery(stmt2, "SELECT * FROM test WHERE k=1 FOR UPDATE NOWAIT", + "ERROR: could not obtain lock on row in relation \"test\""); + + // Assert that we failed immediately without retrying at all. + Long read_count_after = getTServerMetric( + "handler_latency_yb_tserver_TabletServerService_Read").count; + LOG.info("read_count_after=" + read_count_after); + assertTrue((read_count_after - read_count_before) == 2); + stmt1.execute("COMMIT"); + stmt2.execute("ROLLBACK"); + } + } } diff --git a/src/postgres/src/backend/executor/nodeLockRows.c b/src/postgres/src/backend/executor/nodeLockRows.c index 67e9d6aca956..db50ad907654 100644 --- a/src/postgres/src/backend/executor/nodeLockRows.c +++ b/src/postgres/src/backend/executor/nodeLockRows.c @@ -254,10 +254,24 @@ ExecLockRows(PlanState *pstate) * with IsolationUsesXactSnapshot(). */ if (true) + { + if (erm->waitPolicy == LockWaitError) + { + // In case the user has specified NOWAIT, the intention is to error out immediately. If + // we raise TransactionErrorCode::kConflict, the statement might be retried by our + // retry logic in yb_attempt_to_restart_on_error(). + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on row in relation \"%s\"", + RelationGetRelationName(erm->relation)))); + } + ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to concurrent update"), yb_txn_errcode(YBCGetTxnConflictErrorCode()))); + } + if (ItemPointerIndicatesMovedPartitions(&hufd.ctid)) ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),