Skip to content

Commit

Permalink
[YSQL][#2523] Fix read point for row lock stmt
Browse files Browse the repository at this point in the history
Summary:
* Set read times when row mark is set

  Read times are left invalid when row marks are set.  They should be
  set to have snapshot isolation reads happen at the transaction start
  time and for conflict detection to also use the transaction start
  time.  Modify logic to set and pass the read times.

* Detect intent conflicts with empty prefix

  The intent and regular records conflict detection in `ProcessIntent`
  does not handle cases when the intent key prefix is empty (signifying
  full table).  Handle this extra case crudely by comparing the key
  prefix with `ValueTypeAsChar::kGroupEnd`.

* Add new test for row lock statements

  Add test methods `PgMiniTest::TestInsertSelectRowLock`,
  `PgMiniTest::TestDeleteSelectRowLock`, and Gtests

  * `PgMiniTest.SerializableDeleteForKeyShare`
  * `PgMiniTest.SerializableDeleteForShare`
  * `PgMiniTest.SerializableInsertForKeyShare`
  * `PgMiniTest.SerializableInsertForShare`
  * `PgMiniTest.SnapshotDeleteForKeyShare`
  * `PgMiniTest.SnapshotDeleteForShare`
  * `PgMiniTest.SnapshotInsertForKeyShare`
  * `PgMiniTest.SnapshotInsertForShare`

  that call that test method with different arguments.

Test Plan:
* #2523
* Jenkins
* `./yb_build.sh`
  * `--cxx-test pgwrapper_pg_mini-test --gtest_filter
    PgMiniTest.SerializableDeleteForKeyShare`
  * `--cxx-test pgwrapper_pg_mini-test --gtest_filter
    PgMiniTest.SerializableDeleteForShare`
  * `--cxx-test pgwrapper_pg_mini-test --gtest_filter
    PgMiniTest.SerializableInsertForKeyShare`
  * `--cxx-test pgwrapper_pg_mini-test --gtest_filter
    PgMiniTest.SerializableInsertForShare`
  * `--cxx-test pgwrapper_pg_mini-test --gtest_filter
    PgMiniTest.SnapshotDeleteForKeyShare`
  * `--cxx-test pgwrapper_pg_mini-test --gtest_filter
    PgMiniTest.SnapshotDeleteForShare`
  * `--cxx-test pgwrapper_pg_mini-test --gtest_filter
    PgMiniTest.SnapshotInsertForKeyShare`
  * `--cxx-test pgwrapper_pg_mini-test --gtest_filter
    PgMiniTest.SnapshotInsertForShare`
  * `--java-test org.yb.pgsql.TestPgForeignKey`

Reviewers: mikhail

Reviewed By: mikhail

Subscribers: yql, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D7488
  • Loading branch information
Jason Kim committed Nov 6, 2019
1 parent 84abad3 commit 073b342
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 11 deletions.
4 changes: 0 additions & 4 deletions src/postgres/src/backend/utils/adt/ri_triggers.c
Original file line number Diff line number Diff line change
Expand Up @@ -830,10 +830,6 @@ ri_restrict(TriggerData *trigdata, bool is_no_action)
queryoids[i] = pk_type;
}

/*
* TODO In YB mode we currently only allow foreign key DMLs
* in YB serializable mode -- so no need for key share here
*/
appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");

/* Prepare and save the plan */
Expand Down
5 changes: 4 additions & 1 deletion src/yb/docdb/conflict_resolution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,10 @@ class TransactionConflictResolverContext : public ConflictResolverContext {

value_iter.Seek(key_slice);
KeyBytes buffer;
while (value_iter.Valid() && value_iter.key().starts_with(key_slice)) {
// Inspect records whose doc keys are children of the intent's doc key. If the intent's doc
// key is empty, it signifies an intent on the whole table.
while (value_iter.Valid() && (key_slice.starts_with(ValueTypeAsChar::kGroupEnd) ||
value_iter.key().starts_with(key_slice))) {
auto existing_key = value_iter.key();
auto doc_ht = VERIFY_RESULT(DocHybridTime::DecodeFromEnd(&existing_key));
VLOG(4) << "Check value overwrite: " << transaction_id_
Expand Down
12 changes: 6 additions & 6 deletions src/yb/tserver/tablet_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1203,7 +1203,6 @@ void TabletServiceImpl::Read(const ReadRequestPB* req,
"Read request with row mark types must be part of a transaction"),
TabletServerErrorPB::OPERATION_NOT_SUPPORTED, &context);
}
serializable_isolation = true;
has_row_lock = true;
}
}
Expand All @@ -1212,7 +1211,7 @@ void TabletServiceImpl::Read(const ReadRequestPB* req,
LeaderTabletPeer leader_peer;
ReadContext read_context = {req, resp, &context};

if (serializable_isolation) {
if (serializable_isolation || has_row_lock) {
// At this point we expect that we don't have pure read serializable transactions, and
// always write read intents to detect conflicts with other writes.
leader_peer = LookupLeaderTabletOrRespond(
Expand Down Expand Up @@ -1246,9 +1245,9 @@ void TabletServiceImpl::Read(const ReadRequestPB* req,
// safe_ht_to_read is used only for read restart, so if read_time is valid, then we would respond
// with "restart required".
ReadHybridTime& read_time = read_context.read_time;
if (!has_row_lock) {
read_time = ReadHybridTime::FromReadTimePB(*req);
}

read_time = ReadHybridTime::FromReadTimePB(*req);

read_context.allow_retry = !read_time;
read_context.require_lease = tablet::RequireLease(
req->consistency_level() == YBConsistencyLevel::STRONG);
Expand Down Expand Up @@ -1281,14 +1280,15 @@ void TabletServiceImpl::Read(const ReadRequestPB* req,
host_port_pb.set_port(remote_address.port());
read_context.host_port_pb = &host_port_pb;

if (serializable_isolation) {
if (serializable_isolation || has_row_lock) {
WriteRequestPB write_req;
*write_req.mutable_write_batch()->mutable_transaction() = req->transaction();
if (has_row_lock) {
// This will have to be modified once we support more row locks.
// See https://github.com/yugabyte/yugabyte-db/issues/1199 and
// https://github.com/yugabyte/yugabyte-db/issues/2496.
write_req.mutable_write_batch()->add_row_mark_type(RowMarkType::ROW_MARK_SHARE);
read_context.read_time.ToPB(write_req.mutable_read_time());
}
write_req.set_tablet_id(req->tablet_id());
write_req.mutable_write_batch()->set_deprecated_may_have_metadata(true);
Expand Down
118 changes: 118 additions & 0 deletions src/yb/yql/pgwrapper/pg_mini-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ class PgMiniTest : public YBMiniClusterTestBase<MiniCluster> {
// are expected.
void TestReadRestart(bool deferrable = true);

// Run interleaved INSERT, SELECT with specified isolation level and row mark. Possible isolation
// levels are SNAPSHOT_ISOLATION and SERIALIZABLE_ISOLATION. Possible row marks are
// ROW_MARK_KEYSHARE and ROW_MARK_SHARE.
void TestInsertSelectRowLock(IsolationLevel isolation, RowMarkType row_mark);

// Run interleaved DELETE, SELECT with specified isolation level and row mark. Possible isolation
// levels are SNAPSHOT_ISOLATION and SERIALIZABLE_ISOLATION. Possible row marks are
// ROW_MARK_KEYSHARE and ROW_MARK_SHARE.
void TestDeleteSelectRowLock(IsolationLevel isolation, RowMarkType row_mark);

private:
std::unique_ptr<PgSupervisor> pg_supervisor_;
HostPort pg_host_port_;
Expand Down Expand Up @@ -247,6 +257,114 @@ TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(ReadRestart)) {
TestReadRestart(false /* deferrable */);
}

void PgMiniTest::TestInsertSelectRowLock(IsolationLevel isolation, RowMarkType row_mark) {
const std::string isolation_str = (
isolation == IsolationLevel::SNAPSHOT_ISOLATION ? "REPEATABLE READ" : "SERIALIZABLE");
const std::string row_mark_str = (
row_mark == RowMarkType::ROW_MARK_KEYSHARE ? "KEY SHARE" : "SHARE");
constexpr auto kSleepTime = 1s;
constexpr int kKeys = 3;
PGConn read_conn = ASSERT_RESULT(Connect());
PGConn misc_conn = ASSERT_RESULT(Connect());
PGConn write_conn = ASSERT_RESULT(Connect());

// Set up table
ASSERT_OK(misc_conn.Execute("CREATE TABLE t (i INT PRIMARY KEY, j INT)"));
// TODO: remove this when issue #2857 is fixed.
std::this_thread::sleep_for(kSleepTime);
for (int i = 0; i < kKeys; ++i) {
ASSERT_OK(misc_conn.ExecuteFormat("INSERT INTO t (i, j) VALUES ($0, $0)", i));
}

ASSERT_OK(read_conn.ExecuteFormat("BEGIN TRANSACTION ISOLATION LEVEL $0", isolation_str));
ASSERT_OK(read_conn.Fetch("SELECT '(setting read point)'"));
ASSERT_OK(write_conn.ExecuteFormat("INSERT INTO t (i, j) VALUES ($0, $0)", kKeys));
auto result = read_conn.FetchFormat("SELECT * FROM t FOR $0", row_mark_str);
if (isolation == IsolationLevel::SNAPSHOT_ISOLATION) {
// TODO: change to ASSERT_OK and expect kKeys rows when issue #2809 is fixed.
ASSERT_NOK(result);
ASSERT_TRUE(result.status().IsNetworkError()) << result.status();
ASSERT_EQ(PgsqlError(result.status()), YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE)
<< result.status();
ASSERT_STR_CONTAINS(result.status().ToString(), "Value write after transaction start");
ASSERT_OK(read_conn.Execute("ABORT"));
} else {
ASSERT_OK(result);
ASSERT_EQ(PQntuples(result.get().get()), kKeys + 1);
ASSERT_OK(read_conn.Execute("COMMIT"));
}
}

void PgMiniTest::TestDeleteSelectRowLock(IsolationLevel isolation, RowMarkType row_mark) {
const std::string isolation_str = (
isolation == IsolationLevel::SNAPSHOT_ISOLATION ? "REPEATABLE READ" : "SERIALIZABLE");
const std::string row_mark_str = (
row_mark == RowMarkType::ROW_MARK_KEYSHARE ? "KEY SHARE" : "SHARE");
constexpr auto kSleepTime = 1s;
constexpr int kKeys = 3;
PGConn read_conn = ASSERT_RESULT(Connect());
PGConn misc_conn = ASSERT_RESULT(Connect());
PGConn write_conn = ASSERT_RESULT(Connect());

// Set up table
ASSERT_OK(misc_conn.Execute("CREATE TABLE t (i INT PRIMARY KEY, j INT)"));
// TODO: remove this when issue #2857 is fixed.
std::this_thread::sleep_for(kSleepTime);
for (int i = 0; i < kKeys; ++i) {
ASSERT_OK(misc_conn.ExecuteFormat("INSERT INTO t (i, j) VALUES ($0, $0)", i));
}

ASSERT_OK(read_conn.ExecuteFormat("BEGIN TRANSACTION ISOLATION LEVEL $0", isolation_str));
ASSERT_OK(read_conn.Fetch("SELECT '(setting read point)'"));
ASSERT_OK(write_conn.ExecuteFormat("DELETE FROM t WHERE i = $0", RandomUniformInt(0, kKeys - 1)));
auto result = read_conn.FetchFormat("SELECT * FROM t FOR $0", row_mark_str);
if (isolation == IsolationLevel::SNAPSHOT_ISOLATION) {
ASSERT_NOK(result);
ASSERT_TRUE(result.status().IsNetworkError()) << result.status();
ASSERT_EQ(PgsqlError(result.status()), YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE)
<< result.status();
ASSERT_STR_CONTAINS(result.status().ToString(), "Value write after transaction start");
ASSERT_OK(read_conn.Execute("ABORT"));
} else {
// TODO: either this or the DELETE above should be ASSERT_NOK (related to issue #2831).
ASSERT_OK(result);
ASSERT_EQ(PQntuples(result.get().get()), kKeys - 1);
ASSERT_OK(read_conn.Execute("COMMIT"));
}
}

TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SnapshotInsertForShare)) {
TestInsertSelectRowLock(IsolationLevel::SNAPSHOT_ISOLATION, RowMarkType::ROW_MARK_SHARE);
}

TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SerializableInsertForShare)) {
TestInsertSelectRowLock(IsolationLevel::SERIALIZABLE_ISOLATION, RowMarkType::ROW_MARK_SHARE);
}

TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SnapshotInsertForKeyShare)) {
TestInsertSelectRowLock(IsolationLevel::SNAPSHOT_ISOLATION, RowMarkType::ROW_MARK_KEYSHARE);
}

TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SerializableInsertForKeyShare)) {
TestInsertSelectRowLock(IsolationLevel::SERIALIZABLE_ISOLATION, RowMarkType::ROW_MARK_KEYSHARE);
}

TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SnapshotDeleteForShare)) {
TestDeleteSelectRowLock(IsolationLevel::SNAPSHOT_ISOLATION, RowMarkType::ROW_MARK_SHARE);
}

TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SerializableDeleteForShare)) {
TestDeleteSelectRowLock(IsolationLevel::SERIALIZABLE_ISOLATION, RowMarkType::ROW_MARK_SHARE);
}

TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SnapshotDeleteForKeyShare)) {
TestDeleteSelectRowLock(IsolationLevel::SNAPSHOT_ISOLATION, RowMarkType::ROW_MARK_KEYSHARE);
}

TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SerializableDeleteForKeyShare)) {
TestDeleteSelectRowLock(IsolationLevel::SERIALIZABLE_ISOLATION, RowMarkType::ROW_MARK_KEYSHARE);
}

TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SerializableReadOnly)) {
PGConn read_conn = ASSERT_RESULT(Connect());
PGConn setup_conn = ASSERT_RESULT(Connect());
Expand Down

0 comments on commit 073b342

Please sign in to comment.