Skip to content

Commit

Permalink
[BACKPORT 2024.1][#22935] YSQL: Use db oid in the tserver's sequence …
Browse files Browse the repository at this point in the history
…cache entry key

Summary:
Original commit: 214d44a / D36182
Since the tserver's sequence cache was introduced, the entry key was
the sequence's oid. However, it is not unique, each database has its
own oid generator, and each new database starts its oids from 16384,
hence high probability of collision in the sequence cache, which is
one instance per node and shared between all the databases.

Solution is to use both database oid and sequence oid as the entry key.
Database oid is unique between all databases within the cluster, and
the oids of the sequences are unique within the database.

Initially the bug was discovered when a database was dropped and
recreated, because the cache entries are not invalidated when sequence
is dropped. We may want to add invalidation logic, however, the problem
isn't critical with composite key: oids of dropped databases and other
objects are not recycled.
Jira: DB-11851

Test Plan:
./yb_build.sh --java-test org.yb.pgsql.TestPgSequences#testMultiDbIsolation
./yb_build.sh --java-test org.yb.pgsql.TestPgSequencesWithServerCache#testMultiDbIsolation

Reviewers: jason

Reviewed By: jason

Subscribers: yql, ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D36214
  • Loading branch information
andrei-mart committed Jun 27, 2024
1 parent 860283d commit 8f4d569
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 17 deletions.
63 changes: 63 additions & 0 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgSequences.java
Original file line number Diff line number Diff line change
Expand Up @@ -971,4 +971,67 @@ public void testAlterSequence() throws Exception {
assertEquals(Long.MAX_VALUE, rs.getLong("nextval"));
}
}

/**
* Create use and drop a sequence, verifying that generated numbers are sequential, hence
* confirming there is no interference with concurrent activities in the cluster.
*/
private class TestSequenceIsolation extends Thread {
private static final String SEQ_NAME = "myseq1";
private static final int NEXT_COUNT = 997;

private String dbName;
private volatile boolean success = false;

public TestSequenceIsolation(String dbName) {
this.dbName = dbName;
}

@Override
public void run() {
try (Connection conn = getConnectionBuilder().withDatabase(dbName).connect();
Statement stmt = conn.createStatement()) {
stmt.execute(String.format("CREATE SEQUENCE %s CACHE 100", SEQ_NAME));
for (int i = 1; i <= NEXT_COUNT; i++) {
ResultSet rs = stmt.executeQuery(String.format("SELECT nextval('%s')", SEQ_NAME));
assertTrue(rs.next());
assertEquals(i, rs.getInt(1));
assertFalse(rs.next());
}
stmt.execute(String.format("DROP SEQUENCE %s", SEQ_NAME));
success = true;
} catch (Exception ex) {
LOG.error(String.format("Failed sequence isolation test for database %s: ", dbName), ex);
}
}

public boolean isSucceeded() throws InterruptedException {
join();
return success;
}

}

/**
* Test that sequences in different databases can be used independently from each other.
* @throws Exception
*/
@Test
public void testMultiDbIsolation() throws Exception {
final int DB_COUNT = 3;
try (Statement stmt = connection.createStatement()) {
for (int i = 1; i <= DB_COUNT; i++) {
stmt.execute(String.format("CREATE DATABASE mydb%d", i));
}
}
ArrayList<TestSequenceIsolation> workers = new ArrayList<TestSequenceIsolation>(DB_COUNT);
for (int i = 1; i <= DB_COUNT; i++) {
TestSequenceIsolation worker = new TestSequenceIsolation(String.format("mydb%d", i));
worker.start();
workers.add(worker);
}
for (TestSequenceIsolation worker : workers) {
assertTrue(worker.isSucceeded());
}
}
}
13 changes: 7 additions & 6 deletions src/yb/tserver/pg_client_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1622,7 +1622,6 @@ Status PgClientSession::FetchSequenceTuple(
using pggate::PgDocData;
using pggate::PgWireDataHeader;

const int64_t sequence_id = req.seq_oid();
const int64_t inc_by = req.inc_by();
const bool use_sequence_cache = FLAGS_ysql_sequence_cache_method == "server";
std::shared_ptr<PgSequenceCache::Entry> entry;
Expand All @@ -1633,6 +1632,8 @@ Status PgClientSession::FetchSequenceTuple(
}
});
if (use_sequence_cache) {
const PgObjectId sequence_id(
narrow_cast<uint32_t>(req.db_oid()), narrow_cast<uint32_t>(req.seq_oid()));
entry = VERIFY_RESULT(
sequence_cache_.GetWhenAvailable(sequence_id, ToSteady(context->GetClientDeadline())));

Expand All @@ -1656,7 +1657,7 @@ Status PgClientSession::FetchSequenceTuple(
RETURN_NOT_OK(
(SetCatalogVersion<PgFetchSequenceTupleRequestPB, PgsqlWriteRequestPB>(req, write_request)));
write_request->add_partition_column_values()->mutable_value()->set_int64_value(req.db_oid());
write_request->add_partition_column_values()->mutable_value()->set_int64_value(sequence_id);
write_request->add_partition_column_values()->mutable_value()->set_int64_value(req.seq_oid());

auto* fetch_sequence_params = write_request->mutable_fetch_sequence_params();
fetch_sequence_params->set_fetch_count(req.fetch_count());
Expand Down Expand Up @@ -1684,22 +1685,22 @@ Status PgClientSession::FetchSequenceTuple(
PgDocData::LoadCache(context->sidecars().GetFirst(), &row_count, &cursor);
if (row_count != 2) {
return STATUS_SUBSTITUTE(InternalError, "Invalid row count has been fetched from sequence $0",
sequence_id);
req.seq_oid());
}

// Get the range start
if (PgDocData::ReadHeaderIsNull(&cursor)) {
return STATUS_SUBSTITUTE(InternalError,
"Invalid value range start has been fetched from sequence $0",
sequence_id);
req.seq_oid());
}
auto first_value = PgDocData::ReadNumber<int64_t>(&cursor);

// Get the range end
if (PgDocData::ReadHeaderIsNull(&cursor)) {
return STATUS_SUBSTITUTE(InternalError,
"Invalid value range end has been fetched from sequence $0",
sequence_id);
req.seq_oid());
}
auto last_value = PgDocData::ReadNumber<int64_t>(&cursor);

Expand All @@ -1709,7 +1710,7 @@ Status PgClientSession::FetchSequenceTuple(

RSTATUS_DCHECK(
optional_sequence_value.has_value(), InternalError, "Value for sequence $0 was not found.",
sequence_id);
req.seq_oid());
// Since the tserver cache is enabled, the connection cache size is implicitly 1 so the first
// and last value are the same.
last_value = first_value = *optional_sequence_value;
Expand Down
6 changes: 2 additions & 4 deletions src/yb/tserver/pg_sequence_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,12 @@ void PgSequenceCache::Entry::NotifyWaiter() {
}

Result<std::shared_ptr<PgSequenceCache::Entry>> PgSequenceCache::GetWhenAvailable(
int64_t sequence_id, const MonoTime& deadline) {
VLOG(3) << "Checking if entry for sequence id " << sequence_id
<< "exists in tserver sequence cache";

const PgObjectId& sequence_id, const MonoTime& deadline) {
std::shared_ptr<Entry> entry;
{
std::lock_guard cache_lock_guard(lock_);
if (!cache_.contains(sequence_id)) {
VLOG(3) << "Create cache entry for sequence id " << sequence_id;
cache_[sequence_id] = std::make_shared<Entry>();
}

Expand Down
13 changes: 6 additions & 7 deletions src/yb/tserver/pg_sequence_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
#include <unordered_map>
#include <unordered_set>

#include "yb/common/pg_types.h"
#include "yb/util/condition_variable.h"
#include "yb/util/locks.h"
#include "yb/util/mutex.h"

namespace yb {
namespace tserver {
namespace yb::tserver {

class PgSequenceCache {
public:
Expand Down Expand Up @@ -60,13 +60,12 @@ class PgSequenceCache {
// Wait on the cv until the id is available and create an entry for this id if one didn't exist.
// This function returns the entry if successfully waited, or a time out status if the thread
// timed out while waiting.
Result<std::shared_ptr<Entry>> GetWhenAvailable(int64_t sequence_id, const MonoTime& deadline)
EXCLUDES(lock_);
Result<std::shared_ptr<Entry>> GetWhenAvailable(
const PgObjectId& sequence_id, const MonoTime& deadline) EXCLUDES(lock_);

private:
simple_spinlock lock_;
std::unordered_map<int64_t, std::shared_ptr<Entry>> cache_ GUARDED_BY(lock_);
std::unordered_map<PgObjectId, std::shared_ptr<Entry>, PgObjectIdHash> cache_ GUARDED_BY(lock_);
};

} // namespace tserver
} // namespace yb
} // namespace yb::tserver

0 comments on commit 8f4d569

Please sign in to comment.