Skip to content

Commit

Permalink
[#9936] DocDB: Use PgClient for performing operations
Browse files Browse the repository at this point in the history
Summary:
Postgres layer uses `YBClient` to perform read and write operations.
Since Postgres creates a separate process for each incoming connection, it creates `YBClient` for each incoming connection.
But `YBClient` establishes connections to all tservers in the cluster and to the master leader.
As result in the case of large systems with a lot of incoming connections, we get a lot of connections between nodes.
It consumes resources and slows down the whole system.

This diff moves read and write logic from postgres layer to the previously designed PgClient.
That establishes a single local connection from postgres to tserver.

Test Plan: Jenkins

Reviewers: dmitry

Reviewed By: dmitry

Subscribers: jason, yql, ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D13244
  • Loading branch information
spolitov committed Mar 4, 2022
1 parent f1eb28e commit c5f5125
Show file tree
Hide file tree
Showing 105 changed files with 3,244 additions and 2,015 deletions.
4 changes: 3 additions & 1 deletion ent/src/yb/master/catalog_manager_ent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1361,8 +1361,10 @@ Status CatalogManager::RepartitionTable(const scoped_refptr<TableInfo> table,
ScopedInfoCommitter<TabletInfo> unlocker_old(&old_tablets);

// Change table's partition schema to the external snapshot's.
table_lock.mutable_data()->pb.mutable_partition_schema()->CopyFrom(
auto& table_pb = table_lock.mutable_data()->pb;
table_pb.mutable_partition_schema()->CopyFrom(
table_data->table_entry_pb.partition_schema());
table_pb.set_partition_list_version(table_pb.partition_list_version() + 1);

// Remove old tablets from TableInfo.
table->RemoveTablets(old_tablets);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,8 @@ public class TestDropTableWithConcurrentTxn extends BasePgSQLTest {
"expired or aborted by a conflict";
private static final String RESOURCE_NONEXISTING_ERROR =
"does not exist";
private static final String TABLET_NONEXISTING_ERROR =
"Tablet deleted";
private static final String SCHEMA_VERSION_MISMATCH_ERROR =
"schema version mismatch for table";
private static final String TABLE_DELETED_ERROR =
"Table deleted";
private static final String NO_ERROR = "";
private static final boolean executeDmlBeforeDrop = true;
private static final boolean executeDmlAfterDrop = false;
Expand Down Expand Up @@ -304,12 +300,12 @@ public void testDmlTxnDrop() throws Exception {
runDmlTxnWithDropOnCurrentResource(Dml.INSERT, tableDrop, !withCachedMetadata,
executeDmlAfterDrop, RESOURCE_NONEXISTING_ERROR);
runDmlTxnWithDropOnCurrentResource(Dml.INSERT, tableDrop, withCachedMetadata,
executeDmlAfterDrop, TABLET_NONEXISTING_ERROR);
executeDmlAfterDrop, RESOURCE_NONEXISTING_ERROR);
LOG.info("Run SELECT transactions AFTER drop");
runDmlTxnWithDropOnCurrentResource(Dml.SELECT, tableDrop, !withCachedMetadata,
executeDmlAfterDrop, RESOURCE_NONEXISTING_ERROR);
runDmlTxnWithDropOnCurrentResource(Dml.SELECT, tableDrop, withCachedMetadata,
executeDmlAfterDrop, TABLE_DELETED_ERROR);
executeDmlAfterDrop, RESOURCE_NONEXISTING_ERROR);
// For SELECT before DDL on either cached or non-cached table
// transaction should not conflict because select operation just
// picks a read time and doesn't take a distributed transaction lock.
Expand Down Expand Up @@ -339,7 +335,7 @@ public void testDmlTxnDrop() throws Exception {
executeDmlBeforeDrop, TRANSACTION_CONFLICT_ERROR);
LOG.info("Run INSERT transaction AFTER drop");
runDmlTxnWithDropOnCurrentResource(Dml.INSERT, indexDrop, withCachedMetadata,
executeDmlAfterDrop, TABLE_DELETED_ERROR);
executeDmlAfterDrop, RESOURCE_NONEXISTING_ERROR);
LOG.info("Run SELECT transaction AFTER drop");
runDmlTxnWithDropOnCurrentResource(Dml.SELECT, indexDrop, withCachedMetadata,
executeDmlAfterDrop, SCHEMA_VERSION_MISMATCH_ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void testSelect() throws Exception {
getSingleRow(stmt.getResultSet());
}
updateCounter(counter);
assertEquals(1, counter.rpc.value() / queryCount);
assertEquals(2, counter.rpc.value() / queryCount);
}
}
}
111 changes: 0 additions & 111 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgRpcForwarding.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void testTimeout() throws Exception {
query = "SELECT count(*) FROM timeouttest";
try (ResultSet rs = statement.executeQuery(query)) {
} catch (PSQLException ex) {
if (Pattern.matches(".*RPC .* timed out after.*", ex.getMessage())) {
if (ex.getMessage().contains("canceling statement due to statement timeout")) {
LOG.info("Timeout ERROR: " + ex.getMessage());
timeoutEncountered = true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/postgres/src/backend/utils/misc/pg_yb_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ YBCAbortTransaction()
return;

if (YBTransactionsEnabled())
YBCPgAbortTransaction();
HandleYBStatus(YBCPgAbortTransaction());
}

void
Expand Down
20 changes: 6 additions & 14 deletions src/yb/client/async_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -458,12 +458,6 @@ void HandleExtraFields(YBqlReadOp* op, tserver::ReadRequestPB* req) {
}
}

void HandleExtraFields(YBPgsqlReadOp* op, tserver::ReadRequestPB* req) {
if (op->read_time()) {
op->read_time().AddToPB(req);
}
}

template <class OpType, class Req, class Out>
void FillOps(
const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) {
Expand Down Expand Up @@ -603,10 +597,9 @@ void WriteRpc::SwapResponses() {
pgsql_op->mutable_response()->Swap(resp_.mutable_pgsql_response_batch(pgsql_idx));
const auto& pgsql_response = pgsql_op->response();
if (pgsql_response.has_rows_data_sidecar()) {
Slice rows_data = CHECK_RESULT(retrier().controller().GetSidecar(
pgsql_response.rows_data_sidecar()));
down_cast<YBPgsqlWriteOp*>(yb_op)->mutable_rows_data()->assign(
to_char_ptr(rows_data.data()), rows_data.size());
auto holder = CHECK_RESULT(
retrier().controller().GetSidecarHolder(pgsql_response.rows_data_sidecar()));
down_cast<YBPgsqlWriteOp*>(yb_op)->SetRowsData(holder.first, holder.second);
}
pgsql_idx++;
break;
Expand Down Expand Up @@ -751,10 +744,9 @@ void ReadRpc::SwapResponses() {
pgsql_op->mutable_response()->Swap(resp_.mutable_pgsql_batch(pgsql_idx));
const auto& pgsql_response = pgsql_op->response();
if (pgsql_response.has_rows_data_sidecar()) {
Slice rows_data = CHECK_RESULT(retrier().controller().GetSidecar(
pgsql_response.rows_data_sidecar()));
down_cast<YBPgsqlReadOp*>(yb_op)->mutable_rows_data()->assign(
rows_data.cdata(), rows_data.size());
auto holder = CHECK_RESULT(
retrier().controller().GetSidecarHolder(pgsql_response.rows_data_sidecar()));
down_cast<YBPgsqlReadOp*>(yb_op)->SetRowsData(holder.first, holder.second);
}
pgsql_idx++;
break;
Expand Down
5 changes: 3 additions & 2 deletions src/yb/client/batcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,11 @@ std::map<PartitionKey, Status> Batcher::CollectOpsErrors() {
const Schema& schema = GetSchema(op.yb_op->table()->schema());
const PartitionSchema& partition_schema = op.yb_op->table()->partition_schema();
const auto msg = Format(
"Row $0 not in partition $1, partition key: $2",
"Row $0 not in partition $1, partition key: $2, tablet: $3",
op.yb_op->ToString(),
partition_schema.PartitionDebugString(partition, schema),
Slice(partition_key).ToDebugHexString());
Slice(partition_key).ToDebugHexString(),
op.tablet->tablet_id());
LOG_WITH_PREFIX(DFATAL) << msg;
op.error = STATUS(InternalError, msg);
}
Expand Down
9 changes: 6 additions & 3 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@

#include "yb/yql/cql/ql/ptree/pt_option.h"

using namespace std::literals;

using yb::master::AlterTableRequestPB;
using yb::master::CreateTablegroupRequestPB;
using yb::master::CreateTablegroupResponsePB;
Expand Down Expand Up @@ -586,9 +588,10 @@ Status YBClient::TruncateTables(const vector<string>& table_ids, bool wait) {
return data_->TruncateTables(this, table_ids, deadline, wait);
}

Status YBClient::BackfillIndex(const TableId& table_id, bool wait) {
auto deadline = (CoarseMonoClock::Now()
+ MonoDelta::FromMilliseconds(FLAGS_backfill_index_client_rpc_timeout_ms));
Status YBClient::BackfillIndex(const TableId& table_id, bool wait, CoarseTimePoint deadline) {
if (deadline == CoarseTimePoint()) {
deadline = CoarseMonoClock::Now() + FLAGS_backfill_index_client_rpc_timeout_ms * 1ms;
}
return data_->BackfillIndex(this, YBTableName(), table_id, deadline, wait);
}

Expand Down
44 changes: 3 additions & 41 deletions src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,53 +96,14 @@ class TabletServerForwardServiceProxy;
}

namespace client {

namespace internal {
class ClientMasterRpcBase;
}

using GetTableLocationsCallback =
std::function<void(const Result<master::GetTableLocationsResponsePB*>&)>;

// This needs to be called by a client app before performing any operations that could result in
// logging.
void InitLogging();

//
// Installs a callback for internal client logging. It is invoked for a
// log event of any severity, across any YBClient instance.
//
// Only the first invocation has any effect; subsequent invocations are
// a no-op. The caller must ensure that 'cb' stays alive until
// UninstallLoggingCallback() is called.
//
// Before a callback is registered, all internal client log events are
// logged to stderr.
void InstallLoggingCallback(YBLoggingCallback* cb);

// Removes a callback installed via InstallLoggingCallback().
//
// Only the first invocation has any effect; subsequent invocations are
// a no-op.
//
// Should be called before unloading the client library.
void UninstallLoggingCallback();

// Set the logging verbosity of the client library. By default, this is 0. Logs become
// progressively more verbose as the level is increased. Empirically, the highest
// verbosity level used in YB is 6, which includes very fine-grained tracing
// information. Most useful logging is enabled at level 1 or 2, with the higher levels
// used only in rare circumstances.
//
// Logs are emitted to stderr, or to the configured log callback at SEVERITY_INFO.
//
// This may be called safely at any point during usage of the library.
void SetVerboseLogLevel(int level);

// The YB client library uses signals internally in some cases. By default, it uses
// SIGUSR2. If your application makes use of SIGUSR2, this advanced API can help
// workaround conflicts.
Status SetInternalSignalNumber(int signum);

using MasterAddressSource = std::function<std::vector<std::string>()>;

struct TransactionStatusTablets {
Expand Down Expand Up @@ -286,7 +247,8 @@ class YBClient {
CHECKED_STATUS TruncateTables(const std::vector<std::string>& table_ids, bool wait = true);

// Backfill the specified index table. This is only supported for YSQL at the moment.
CHECKED_STATUS BackfillIndex(const TableId& table_id, bool wait = true);
CHECKED_STATUS BackfillIndex(const TableId& table_id, bool wait = true,
CoarseTimePoint deadline = CoarseTimePoint());

// Delete the specified table.
// Set 'wait' to true if the call must wait for the table to be fully deleted before returning.
Expand Down
11 changes: 10 additions & 1 deletion src/yb/client/meta_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,15 @@ void LookupRpc::SendRpc() {
return;
}

// See YBClient::Data::SyncLeaderMasterRpc().
auto now = CoarseMonoClock::Now();
if (retrier().deadline() < now) {
Finished(STATUS_FORMAT(TimedOut, "Timed out after deadline expired, passed: $0",
MonoDelta(now - retrier().start())));
return;
}
mutable_retrier()->PrepareController();

ClientMasterRpcBase::SendRpc();
}

Expand Down Expand Up @@ -1996,7 +2005,7 @@ void MetaCache::LookupTabletByKey(const std::shared_ptr<YBTable>& table,
const auto table_partition_list = table->GetVersionedPartitions();
const auto partition_start = client::FindPartitionStart(table_partition_list, partition_key);
VLOG_WITH_PREFIX_AND_FUNC(5) << "Table: " << table->ToString()
<< ", partition_list_version: " << table_partition_list->version
<< ", table_partition_list: " << table_partition_list->ToString()
<< ", partition_key: " << Slice(partition_key).ToDebugHexString()
<< ", partition_start: " << Slice(*partition_start).ToDebugHexString();

Expand Down
6 changes: 6 additions & 0 deletions src/yb/client/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,12 @@ PartitionKeyPtr FindPartitionStart(
return PartitionKeyPtr(versioned_partitions, &versioned_partitions->keys[idx]);
}

std::string VersionedTablePartitionList::ToString() const {
auto key_transform = [](const Slice& key) {
return key.ToDebugHexString();
};
return Format("{ version: $0 keys: $1 }", version, CollectionToString(keys, key_transform));
}

} // namespace client
} // namespace yb
2 changes: 2 additions & 0 deletions src/yb/client/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ struct VersionedTablePartitionList {
TablePartitionList keys;
// See SysTablesEntryPB::partition_list_version.
PartitionListVersion version;

std::string ToString() const;
};

typedef Result<VersionedTablePartitionListPtr> FetchPartitionsResult;
Expand Down
Loading

0 comments on commit c5f5125

Please sign in to comment.