Skip to content

Commit

Permalink
[yugabyte#12767] YSQL: Send list of aborted sub txns to the status ta…
Browse files Browse the repository at this point in the history
…blet during a savepoint rollback + 2 bug fixes

Summary:
As part of this change, YSQL will complete a savepoint rollback only after it
sends the list of aborted sub txns to the transaction status tablet using the
UpdateTransactionRequestPB heartbeat.

This is to ensure that after rolling back some part of transaction T1, any
statement in another transaction T2 should be able to see that the rolled back
provisional writes are invalid/ aborted.

Other bug fixes -

(1) a minor bug in the 4fb1676 -- the list of aborted sub txns, which is sent
periodically by YSQL, isn't always updated in the status tablet. The aborted
list was accepted by the transaction coordinator only if
`data.state.aborted().set_size() > aborted_.set_size()`, but the set_size() here
isn't the same as the number of aborted sub txns (see AbortedSubTransactionSet).

(2) Intents written for explicit row locks didn't persist sub txn id with them. This is
because for read queries with row level locks, the `write_batch` didn't have
sub txn id set. This is now done in `ReadQuery::DoPerform()`.

Test Plan:
Jenkins: urgent

./yb_build.sh --java-test org.yb.pgsql.TestPgConflictWithAbortedSubTxns#ensureNoConflictsWithAbortedSubTxns
./yb_build.sh --java-test org.yb.pgsql.TestPgIsolationRegress#isolationRegress
    - Enabled aborted-keyrevoke and delete-abort-savept-2 in the isolation test schedule

Reviewers: esheng, rsami

Reviewed By: esheng, rsami

Subscribers: zyu, bogdan, yql

Differential Revision: https://phabricator.dev.yugabyte.com/D17358
  • Loading branch information
pkj415 committed Jun 19, 2022
1 parent 8661e85 commit 85ac8d8
Show file tree
Hide file tree
Showing 24 changed files with 399 additions and 79 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.

package org.yb.pgsql;

import static org.yb.AssertionWrappers.*;

import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.yb.util.ThreadUtil;
import org.yb.util.YBTestRunnerNonTsanOnly;

/**
* This test mimics the scenario in #12767 -
*
* Before this commit's fix, the following scenario leads to a situation where 2 single statement
* read committed transactions block each other -
*
* CREATE TABLE test (id int primary key, v timestamp with time zone NOT NULL);
* CREATE INDEX idx ON test USING btree (v);
*
* 1. 2 read committed isolation transactions are started in separate sessions with begin;
* 2. Both sessions get the request from client to execute a INSERT ON CONFLICT DO UPDATE query on a
* row in the main table.
*
* INSERT INTO test AS x (id, v) VALUES (1, statement_timestamp())
* ON CONFLICT ON CONSTRAINT test_pkey
* DO UPDATE SET v = statement_timestamp() WHERE x.id = excluded.id
*
* 3. Backends of both sessions first read the main table to check if the key already exists. Since
* it does, both backends then try to perform the ON CONFLICT DO UPDATE by issuing three rpcs
* simultaneously to tserver processes -
*
* i) a PGSQL_UPDATE to the main table to update the v time
* ii) a PGSQL_DELETE to the secondary index to remove the entry with existing v value
* iii) a PGSQL_UPSERT to the secondary index to insert a new index entry with new v value
*
* 4. Rpc [a] of session 1 reaches the main table first and performs the write.
* Rpc [b] (and/or [c]) of session 2 reaches the index table first and performs the write there.
*
* 5. So, session 1 has successfully written a provisional entry to the main table and session 2 has
* successfully written provisional entries to the index table.
*
* 6. Now, the other rpcs in both session will fail due to a conflict i.e., rpc [b] and [c] of
* session 1 and rpc [a] of session 2 will fail.
*
* 7. Both sessions, after facing the conflict, will retry the statement by rolling back to the
* internal savepoint registered before the statement's execution (as is done by our read
* committed implementation). Rolling back will effectively mark the provisional data written by
* earlier rpcs in the statement as invalid so that the backends can retry their statement and no
* other transactions should be able to see those invalid provisional entries.
*
* 8. However, even after rolling back savepoints, other transactions consider the rolled back
* provisional entries for conflict resolution and hence still face conflicts. This is because
* the list of aborted sub txns isn't available to the txn participants during conflict
* detection.
*
* 9. This leads to both sessions retrying and facing false conflicts till statement timeout is hit.
*/
@RunWith(value = YBTestRunnerNonTsanOnly.class)
public class TestPgConflictWithAbortedSubTxns extends BasePgSQLTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestPgConflictWithAbortedSubTxns.class);

private static final int NUM_THREADS = 9;
private static final int TEST_DURATION_SECS = 120;
private static final int STATEMENT_TIMEOUT_MS = 10 * 1000;

@Override
protected Map<String, String> getTServerFlags() {
Map<String, String> flags = super.getTServerFlags();
flags.put("yb_enable_read_committed_isolation", "true");
flags.put("ysql_pg_conf_csv", "statement_timeout=" + STATEMENT_TIMEOUT_MS);
return flags;
}

@Test
public void ensureNoConflictsWithAbortedSubTxns() throws Exception {
try (Statement statement = connection.createStatement()) {
statement.execute(
"CREATE TABLE test (id int primary key, v timestamp with time zone NOT NULL)");
statement.execute("CREATE INDEX idx ON test USING btree (v)");
}

ExecutorService es = Executors.newFixedThreadPool(NUM_THREADS);
List<Future<?>> futures = new ArrayList<>();
List<Runnable> runnables = new ArrayList<>();

for (int i = 0; i < NUM_THREADS; i++) {
runnables.add(() -> {
try (Connection conn =
getConnectionBuilder().withIsolationLevel(IsolationLevel.READ_COMMITTED)
.withAutoCommit(AutoCommit.ENABLED).connect();
Statement stmt = conn.createStatement();) {

long end_time = System.currentTimeMillis() + TEST_DURATION_SECS*1000;

while (System.currentTimeMillis() < end_time) {
stmt.execute("BEGIN");
stmt.execute(
"INSERT INTO test AS x (id, v) VALUES (1, statement_timestamp()) " +
"ON CONFLICT ON CONSTRAINT test_pkey " +
"DO UPDATE SET v = statement_timestamp() WHERE x.id = excluded.id"
);
stmt.execute("COMMIT");
}
} catch (Exception ex) {
fail("Failed due to exception: " + ex.getMessage());
}
});
}

for (Runnable r : runnables) {
futures.add(es.submit(r));
}

try {
LOG.info("Waiting for all threads");
for (Future<?> future : futures) {
future.get(TEST_DURATION_SECS + 10, TimeUnit.SECONDS);
}
} catch (TimeoutException ex) {
LOG.warn("Threads info:\n\n" + ThreadUtil.getAllThreadsInfo());
fail("Waiting for threads timed out, this is unexpected!");
}

try (Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE test");
}
}
}
10 changes: 7 additions & 3 deletions src/postgres/src/backend/access/transam/xact.c
Original file line number Diff line number Diff line change
Expand Up @@ -4156,6 +4156,8 @@ DefineSavepoint(const char *name)
/* Normal subtransaction start */
PushTransaction();
s = CurrentTransactionState; /* changed by push */
elog(DEBUG2, "new sub txn created by savepoint, subtxn_id: %d",
s->subTransactionId);

/*
* Savepoint names, like the TransactionState block itself, live
Expand Down Expand Up @@ -4442,7 +4444,7 @@ RollbackToSavepoint(const char *name)
elog(FATAL, "RollbackToSavepoint: unexpected state %s",
BlockStateAsString(xact->blockState));

YBCRollbackSubTransaction(target->subTransactionId);
YBCRollbackToSubTransaction(target->subTransactionId);
}

/*
Expand Down Expand Up @@ -4493,6 +4495,8 @@ BeginInternalSubTransaction(const char *name)
/* Normal subtransaction start */
PushTransaction();
s = CurrentTransactionState; /* changed by push */
elog(DEBUG2, "new sub txn created internally, subtxn_id: %d",
s->subTransactionId);

/*
* Savepoint names, like the TransactionState block itself, live
Expand Down Expand Up @@ -4539,7 +4543,6 @@ BeginInternalSubTransaction(const char *name)
*/
void
BeginInternalSubTransactionForReadCommittedStatement() {
elog(DEBUG2, "Begin internal sub txn for statement in READ COMMITTED isolation");

YBFlushBufferedOperations();
TransactionState s = CurrentTransactionState;
Expand All @@ -4556,6 +4559,7 @@ BeginInternalSubTransactionForReadCommittedStatement() {
/* Normal subtransaction start */
PushTransaction();
s = CurrentTransactionState; /* changed by push */
elog(DEBUG2, "new internal sub txn in READ COMMITTED subtxn_id: %d", s->subTransactionId);

s->name = MemoryContextStrdup(TopTransactionContext, YB_READ_COMMITTED_INTERNAL_SUB_TXN_NAME);

Expand Down Expand Up @@ -5158,7 +5162,7 @@ AbortSubTransaction(void)
AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
}

YBCRollbackSubTransaction(s->subTransactionId);
YBCRollbackToSubTransaction(s->subTransactionId);

/*
* Restore the upper transaction's read-only state, too. This should be
Expand Down
4 changes: 2 additions & 2 deletions src/postgres/src/backend/utils/misc/pg_yb_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -578,10 +578,10 @@ YBCSetActiveSubTransaction(SubTransactionId id)
}

void
YBCRollbackSubTransaction(SubTransactionId id)
YBCRollbackToSubTransaction(SubTransactionId id)
{
if (YBSavepointsEnabled())
HandleYBStatus(YBCPgRollbackSubTransaction(id));
HandleYBStatus(YBCPgRollbackToSubTransaction(id));
}

bool
Expand Down
2 changes: 1 addition & 1 deletion src/postgres/src/include/pg_yb_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ extern void YBCAbortTransaction();

extern void YBCSetActiveSubTransaction(SubTransactionId id);

extern void YBCRollbackSubTransaction(SubTransactionId id);
extern void YBCRollbackToSubTransaction(SubTransactionId id);

/*
* Return true if we want to allow PostgreSQL's own locking. This is needed
Expand Down
4 changes: 3 additions & 1 deletion src/postgres/src/test/isolation/yb_pg_isolation_schedule
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ test: insert-conflict-do-update
test: multixact-no-deadlock
test: read-only-anomaly
test: nowait-2
test: nowait-3
test: nowait-3
test: aborted-keyrevoke
test: delete-abort-savept-2
Loading

0 comments on commit 85ac8d8

Please sign in to comment.