From 8f4d5693024cd71568637cc0d31f5991e17779b1 Mon Sep 17 00:00:00 2001 From: Andrei Martsinchyk Date: Thu, 27 Jun 2024 11:18:02 -0700 Subject: [PATCH] [BACKPORT 2024.1][#22935] YSQL: Use db oid in the tserver's sequence cache entry key Summary: Original commit: 214d44a7ce0b8beb59e9dd2a6bb14f5d829ab81d / D36182 Since the tserver's sequence cache was introduced, the entry key was the sequence's oid. However, it is not unique, each database has its own oid generator, and each new database starts its oids from 16384, hence high probability of collision in the sequence cache, which is one instance per node and shared between all the databases. Solution is to use both database oid and sequence oid as the entry key. Database oid is unique between all databases within the cluster, and the oids of the sequences are unique within the database. Initially the bug was discovered when a database was dropped and recreated, because the cache entries are not invalidated when sequence is dropped. We may want to add invalidation logic, however, the problem isn't critical with composite key: oids of dropped databases and other objects are not recycled. Jira: DB-11851 Test Plan: ./yb_build.sh --java-test org.yb.pgsql.TestPgSequences#testMultiDbIsolation ./yb_build.sh --java-test org.yb.pgsql.TestPgSequencesWithServerCache#testMultiDbIsolation Reviewers: jason Reviewed By: jason Subscribers: yql, ybase Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D36214 --- .../java/org/yb/pgsql/TestPgSequences.java | 63 +++++++++++++++++++ src/yb/tserver/pg_client_session.cc | 13 ++-- src/yb/tserver/pg_sequence_cache.cc | 6 +- src/yb/tserver/pg_sequence_cache.h | 13 ++-- 4 files changed, 78 insertions(+), 17 deletions(-) diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgSequences.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgSequences.java index 763a0dfb22a0..850b2a0c7a05 100644 --- a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgSequences.java +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgSequences.java @@ -971,4 +971,67 @@ public void testAlterSequence() throws Exception { assertEquals(Long.MAX_VALUE, rs.getLong("nextval")); } } + + /** + * Create use and drop a sequence, verifying that generated numbers are sequential, hence + * confirming there is no interference with concurrent activities in the cluster. + */ + private class TestSequenceIsolation extends Thread { + private static final String SEQ_NAME = "myseq1"; + private static final int NEXT_COUNT = 997; + + private String dbName; + private volatile boolean success = false; + + public TestSequenceIsolation(String dbName) { + this.dbName = dbName; + } + + @Override + public void run() { + try (Connection conn = getConnectionBuilder().withDatabase(dbName).connect(); + Statement stmt = conn.createStatement()) { + stmt.execute(String.format("CREATE SEQUENCE %s CACHE 100", SEQ_NAME)); + for (int i = 1; i <= NEXT_COUNT; i++) { + ResultSet rs = stmt.executeQuery(String.format("SELECT nextval('%s')", SEQ_NAME)); + assertTrue(rs.next()); + assertEquals(i, rs.getInt(1)); + assertFalse(rs.next()); + } + stmt.execute(String.format("DROP SEQUENCE %s", SEQ_NAME)); + success = true; + } catch (Exception ex) { + LOG.error(String.format("Failed sequence isolation test for database %s: ", dbName), ex); + } + } + + public boolean isSucceeded() throws InterruptedException { + join(); + return success; + } + + } + + /** + * Test that sequences in different databases can be used independently from each other. + * @throws Exception + */ + @Test + public void testMultiDbIsolation() throws Exception { + final int DB_COUNT = 3; + try (Statement stmt = connection.createStatement()) { + for (int i = 1; i <= DB_COUNT; i++) { + stmt.execute(String.format("CREATE DATABASE mydb%d", i)); + } + } + ArrayList workers = new ArrayList(DB_COUNT); + for (int i = 1; i <= DB_COUNT; i++) { + TestSequenceIsolation worker = new TestSequenceIsolation(String.format("mydb%d", i)); + worker.start(); + workers.add(worker); + } + for (TestSequenceIsolation worker : workers) { + assertTrue(worker.isSucceeded()); + } + } } diff --git a/src/yb/tserver/pg_client_session.cc b/src/yb/tserver/pg_client_session.cc index 5869706170b1..d9bf1bbd8b9b 100644 --- a/src/yb/tserver/pg_client_session.cc +++ b/src/yb/tserver/pg_client_session.cc @@ -1622,7 +1622,6 @@ Status PgClientSession::FetchSequenceTuple( using pggate::PgDocData; using pggate::PgWireDataHeader; - const int64_t sequence_id = req.seq_oid(); const int64_t inc_by = req.inc_by(); const bool use_sequence_cache = FLAGS_ysql_sequence_cache_method == "server"; std::shared_ptr entry; @@ -1633,6 +1632,8 @@ Status PgClientSession::FetchSequenceTuple( } }); if (use_sequence_cache) { + const PgObjectId sequence_id( + narrow_cast(req.db_oid()), narrow_cast(req.seq_oid())); entry = VERIFY_RESULT( sequence_cache_.GetWhenAvailable(sequence_id, ToSteady(context->GetClientDeadline()))); @@ -1656,7 +1657,7 @@ Status PgClientSession::FetchSequenceTuple( RETURN_NOT_OK( (SetCatalogVersion(req, write_request))); write_request->add_partition_column_values()->mutable_value()->set_int64_value(req.db_oid()); - write_request->add_partition_column_values()->mutable_value()->set_int64_value(sequence_id); + write_request->add_partition_column_values()->mutable_value()->set_int64_value(req.seq_oid()); auto* fetch_sequence_params = write_request->mutable_fetch_sequence_params(); fetch_sequence_params->set_fetch_count(req.fetch_count()); @@ -1684,14 +1685,14 @@ Status PgClientSession::FetchSequenceTuple( PgDocData::LoadCache(context->sidecars().GetFirst(), &row_count, &cursor); if (row_count != 2) { return STATUS_SUBSTITUTE(InternalError, "Invalid row count has been fetched from sequence $0", - sequence_id); + req.seq_oid()); } // Get the range start if (PgDocData::ReadHeaderIsNull(&cursor)) { return STATUS_SUBSTITUTE(InternalError, "Invalid value range start has been fetched from sequence $0", - sequence_id); + req.seq_oid()); } auto first_value = PgDocData::ReadNumber(&cursor); @@ -1699,7 +1700,7 @@ Status PgClientSession::FetchSequenceTuple( if (PgDocData::ReadHeaderIsNull(&cursor)) { return STATUS_SUBSTITUTE(InternalError, "Invalid value range end has been fetched from sequence $0", - sequence_id); + req.seq_oid()); } auto last_value = PgDocData::ReadNumber(&cursor); @@ -1709,7 +1710,7 @@ Status PgClientSession::FetchSequenceTuple( RSTATUS_DCHECK( optional_sequence_value.has_value(), InternalError, "Value for sequence $0 was not found.", - sequence_id); + req.seq_oid()); // Since the tserver cache is enabled, the connection cache size is implicitly 1 so the first // and last value are the same. last_value = first_value = *optional_sequence_value; diff --git a/src/yb/tserver/pg_sequence_cache.cc b/src/yb/tserver/pg_sequence_cache.cc index 787a73483b29..d9ed26babedc 100644 --- a/src/yb/tserver/pg_sequence_cache.cc +++ b/src/yb/tserver/pg_sequence_cache.cc @@ -70,14 +70,12 @@ void PgSequenceCache::Entry::NotifyWaiter() { } Result> PgSequenceCache::GetWhenAvailable( - int64_t sequence_id, const MonoTime& deadline) { - VLOG(3) << "Checking if entry for sequence id " << sequence_id - << "exists in tserver sequence cache"; - + const PgObjectId& sequence_id, const MonoTime& deadline) { std::shared_ptr entry; { std::lock_guard cache_lock_guard(lock_); if (!cache_.contains(sequence_id)) { + VLOG(3) << "Create cache entry for sequence id " << sequence_id; cache_[sequence_id] = std::make_shared(); } diff --git a/src/yb/tserver/pg_sequence_cache.h b/src/yb/tserver/pg_sequence_cache.h index 90d8cd1c933a..e9b0ce578af4 100644 --- a/src/yb/tserver/pg_sequence_cache.h +++ b/src/yb/tserver/pg_sequence_cache.h @@ -18,12 +18,12 @@ #include #include +#include "yb/common/pg_types.h" #include "yb/util/condition_variable.h" #include "yb/util/locks.h" #include "yb/util/mutex.h" -namespace yb { -namespace tserver { +namespace yb::tserver { class PgSequenceCache { public: @@ -60,13 +60,12 @@ class PgSequenceCache { // Wait on the cv until the id is available and create an entry for this id if one didn't exist. // This function returns the entry if successfully waited, or a time out status if the thread // timed out while waiting. - Result> GetWhenAvailable(int64_t sequence_id, const MonoTime& deadline) - EXCLUDES(lock_); + Result> GetWhenAvailable( + const PgObjectId& sequence_id, const MonoTime& deadline) EXCLUDES(lock_); private: simple_spinlock lock_; - std::unordered_map> cache_ GUARDED_BY(lock_); + std::unordered_map, PgObjectIdHash> cache_ GUARDED_BY(lock_); }; -} // namespace tserver -} // namespace yb +} // namespace yb::tserver