Skip to content

Commit

Permalink
[#11733] DST: PITR - Data race on enabling snapshot throttling by def…
Browse files Browse the repository at this point in the history
…ault

Summary:
The snapshot coordinator periodically refreshes the value of the throttling. In this process, it queries the number of tservers and thus accesses the cluster config. If the cluster config is being written to during this refresh then there is a race as the config is unguarded currently. One scenario when this can happen is after a Restore when we load the sys catalog.

We need to guard the `cluster_config_` with a lock to solve this as well as introduce error handling if the config is not found.

Test Plan: ybd --cxx-test yb-admin-snapshot-schedule-test --gtest_filter YbAdminSnapshotScheduleTest.CatalogLoadRace

Reviewers: sergei, bogdan

Reviewed By: bogdan

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D15612
  • Loading branch information
sanketkedia committed Mar 13, 2022
1 parent 4eeb5d4 commit 79e1f12
Show file tree
Hide file tree
Showing 14 changed files with 176 additions and 106 deletions.
6 changes: 1 addition & 5 deletions ent/src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon
return snapshot_coordinator_;
}

size_t GetNumLiveTServersForActiveCluster() override;
Result<size_t> GetNumLiveTServersForActiveCluster() override;

private:
friend class SnapshotLoader;
Expand Down Expand Up @@ -341,10 +341,6 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon
const TSHeartbeatRequestPB* req,
TSHeartbeatResponsePB* resp);

scoped_refptr<ClusterConfigInfo> GetClusterConfigInfo() const {
return cluster_config_;
}

// Helper functions for GetTableSchemaCallback, GetTablegroupSchemaCallback
// and GetColocatedTabletSchemaCallback.

Expand Down
61 changes: 36 additions & 25 deletions ent/src/yb/master/catalog_manager_ent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1073,14 +1073,15 @@ Status CatalogManager::ImportSnapshotMeta(const ImportSnapshotMetaRequestPB* req

Status CatalogManager::ChangeEncryptionInfo(const ChangeEncryptionInfoRequestPB* req,
ChangeEncryptionInfoResponsePB* resp) {
auto l = cluster_config_->LockForWrite();
auto cluster_config = ClusterConfig();
auto l = cluster_config->LockForWrite();
auto encryption_info = l.mutable_data()->pb.mutable_encryption_info();

RETURN_NOT_OK(encryption_manager_->ChangeEncryptionInfo(req, encryption_info));

l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
RETURN_NOT_OK(CheckStatus(
sys_catalog_->Upsert(leader_ready_term(), cluster_config_),
sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()),
"updating cluster config in sys-catalog"));
l.Commit();

Expand All @@ -1095,7 +1096,7 @@ Status CatalogManager::ChangeEncryptionInfo(const ChangeEncryptionInfoRequestPB*
Status CatalogManager::IsEncryptionEnabled(const IsEncryptionEnabledRequestPB* req,
IsEncryptionEnabledResponsePB* resp) {
return encryption_manager_->IsEncryptionEnabled(
cluster_config_->LockForRead()->pb.encryption_info(), resp);
ClusterConfig()->LockForRead()->pb.encryption_info(), resp);
}

Status CatalogManager::ImportNamespaceEntry(const SysRowEntry& entry,
Expand Down Expand Up @@ -3068,7 +3069,7 @@ Status CatalogManager::SetupUniverseReplication(const SetupUniverseReplicationRe
}

{
auto l = cluster_config_->LockForRead();
auto l = ClusterConfig()->LockForRead();
if (l->pb.cluster_uuid() == req->producer_id()) {
return STATUS(InvalidArgument, "The request UUID and cluster UUID are identical.",
req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
Expand Down Expand Up @@ -3794,7 +3795,8 @@ Status CatalogManager::UpdateXClusterConsumerOnTabletSplit(
return Status::OK();
}

auto l = cluster_config_->LockForWrite();
auto cluster_config = ClusterConfig();
auto l = cluster_config->LockForWrite();
for (const auto& stream_info : stream_infos) {
std::string universe_id = stream_info.first;
CDCStreamId stream_id = stream_info.second;
Expand Down Expand Up @@ -3823,7 +3825,7 @@ Status CatalogManager::UpdateXClusterConsumerOnTabletSplit(
// Also bump the cluster_config_ version so that changes are propagated to tservers.
l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);

RETURN_NOT_OK(CheckStatus(sys_catalog_->Upsert(leader_ready_term(), cluster_config_),
RETURN_NOT_OK(CheckStatus(sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()),
"Updating cluster config in sys-catalog"));
l.Commit();

Expand Down Expand Up @@ -3908,7 +3910,8 @@ Status CatalogManager::InitCDCConsumer(
HostPortToPB(addr, producer_entry.add_tserver_addrs());
}

auto l = cluster_config_->LockForWrite();
auto cluster_config = ClusterConfig();
auto l = cluster_config->LockForWrite();
auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
auto it = producer_map->find(producer_universe_uuid);
if (it != producer_map->end()) {
Expand All @@ -3919,7 +3922,7 @@ Status CatalogManager::InitCDCConsumer(
(*producer_map)[producer_universe_uuid] = std::move(producer_entry);
l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
RETURN_NOT_OK(CheckStatus(
sys_catalog_->Upsert(leader_ready_term(), cluster_config_),
sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()),
"updating cluster config in sys-catalog"));
l.Commit();

Expand Down Expand Up @@ -3949,7 +3952,8 @@ void CatalogManager::MergeUniverseReplication(scoped_refptr<UniverseReplicationI
}
// Merge Cluster Config for TServers.
{
auto cl = cluster_config_->LockForWrite();
auto cluster_config = ClusterConfig();
auto cl = cluster_config->LockForWrite();
auto pm = cl.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
auto original_producer_entry = pm->find(original_universe->id());
auto alter_producer_entry = pm->find(universe->id());
Expand All @@ -3963,7 +3967,7 @@ void CatalogManager::MergeUniverseReplication(scoped_refptr<UniverseReplicationI
LOG(WARNING) << "Could not find both universes in Cluster Config: " << universe->id();
}
cl.mutable_data()->pb.set_version(cl.mutable_data()->pb.version() + 1);
const Status s = sys_catalog_->Upsert(leader_ready_term(), cluster_config_);
const Status s = sys_catalog_->Upsert(leader_ready_term(), cluster_config.get());
cl.CommitOrWarn(s, "updating cluster config in sys-catalog");
}
// Merge Master Config on Consumer. (no need for Producer changes, since it uses stream_id)
Expand Down Expand Up @@ -4038,14 +4042,15 @@ Status CatalogManager::DeleteUniverseReplication(const DeleteUniverseReplication
// Delete subscribers on the Consumer Registry (removes from TServers).
LOG(INFO) << "Deleting subscribers for producer " << req->producer_id();
{
auto cl = cluster_config_->LockForWrite();
auto cluster_config = ClusterConfig();
auto cl = cluster_config->LockForWrite();
auto producer_map = cl.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
auto it = producer_map->find(req->producer_id());
if (it != producer_map->end()) {
producer_map->erase(it);
cl.mutable_data()->pb.set_version(cl.mutable_data()->pb.version() + 1);
RETURN_NOT_OK(CheckStatus(
sys_catalog_->Upsert(leader_ready_term(), cluster_config_),
sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()),
"updating cluster config in sys-catalog"));
cl.Commit();
}
Expand Down Expand Up @@ -4184,7 +4189,8 @@ Status CatalogManager::SetUniverseReplicationEnabled(

// Modify the Consumer Registry, which will fan out this info to all TServers on heartbeat.
{
auto l = cluster_config_->LockForWrite();
auto cluster_config = ClusterConfig();
auto l = cluster_config->LockForWrite();
auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
auto it = producer_map->find(req->producer_id());
if (it == producer_map->end()) {
Expand All @@ -4195,7 +4201,7 @@ Status CatalogManager::SetUniverseReplicationEnabled(
(*it).second.set_disable_stream(!req->is_enabled());
l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
RETURN_NOT_OK(CheckStatus(
sys_catalog_->Upsert(leader_ready_term(), cluster_config_),
sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()),
"updating cluster config in sys-catalog"));
l.Commit();
}
Expand Down Expand Up @@ -4254,7 +4260,8 @@ Status CatalogManager::AlterUniverseReplication(const AlterUniverseReplicationRe
}
// 1b. Persistent Config: Update the Consumer Registry (updates TServers)
{
auto l = cluster_config_->LockForWrite();
auto cluster_config = ClusterConfig();
auto l = cluster_config->LockForWrite();
auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
auto it = producer_map->find(req->producer_id());
if (it == producer_map->end()) {
Expand All @@ -4265,7 +4272,7 @@ Status CatalogManager::AlterUniverseReplication(const AlterUniverseReplicationRe
(*it).second.mutable_master_addrs()->CopyFrom(req->producer_master_addresses());
l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
RETURN_NOT_OK(CheckStatus(
sys_catalog_->Upsert(leader_ready_term(), cluster_config_),
sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()),
"updating cluster config in sys-catalog"));
l.Commit();
}
Expand Down Expand Up @@ -4294,7 +4301,8 @@ Status CatalogManager::AlterUniverseReplication(const AlterUniverseReplicationRe
vector<CDCStreamId> streams_to_remove;
// 1. Update the Consumer Registry (removes from TServers).
{
auto cl = cluster_config_->LockForWrite();
auto cluster_config = ClusterConfig();
auto cl = cluster_config->LockForWrite();
auto pm = cl.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
auto producer_entry = pm->find(req->producer_id());
if (producer_entry != pm->end()) {
Expand Down Expand Up @@ -4323,7 +4331,7 @@ Status CatalogManager::AlterUniverseReplication(const AlterUniverseReplicationRe
}
cl.mutable_data()->pb.set_version(cl.mutable_data()->pb.version() + 1);
RETURN_NOT_OK(CheckStatus(
sys_catalog_->Upsert(leader_ready_term(), cluster_config_),
sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()),
"updating cluster config in sys-catalog"));
cl.Commit();
}
Expand Down Expand Up @@ -4508,7 +4516,8 @@ Status CatalogManager::RenameUniverseReplication(
metadata->set_producer_id(new_producer_universe_id);

// Also need to update internal maps.
auto cl = cluster_config_->LockForWrite();
auto cluster_config = ClusterConfig();
auto cl = cluster_config->LockForWrite();
auto producer_map = cl.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
(*producer_map)[new_producer_universe_id] =
std::move((*producer_map)[old_universe_replication_id]);
Expand All @@ -4520,7 +4529,7 @@ Status CatalogManager::RenameUniverseReplication(
RETURN_NOT_OK(w->Mutate(QLWriteRequestPB::QL_STMT_DELETE, universe.get()));
RETURN_NOT_OK(w->Mutate(QLWriteRequestPB::QL_STMT_UPDATE,
new_ri.get(),
cluster_config_.get()));
cluster_config.get()));
RETURN_NOT_OK(CheckStatus(
sys_catalog_->SyncWrite(w.get()),
"Updating universe replication info and cluster config in sys-catalog"));
Expand Down Expand Up @@ -4653,7 +4662,8 @@ Status CatalogManager::UpdateConsumerOnProducerSplit(
req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
}

auto l = cluster_config_->LockForWrite();
auto cluster_config = ClusterConfig();
auto l = cluster_config->LockForWrite();
auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
auto producer_entry = FindOrNull(*producer_map, req->producer_id());
if (!producer_entry) {
Expand Down Expand Up @@ -4725,7 +4735,7 @@ Status CatalogManager::UpdateConsumerOnProducerSplit(
// pollers are created for the new tablets).
l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);

RETURN_NOT_OK(CheckStatus(sys_catalog_->Upsert(leader_ready_term(), cluster_config_),
RETURN_NOT_OK(CheckStatus(sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()),
"Updating cluster config in sys-catalog"));
l.Commit();

Expand Down Expand Up @@ -4794,10 +4804,11 @@ void CatalogManager::SysCatalogLoaded(int64_t term) {
return snapshot_coordinator_.SysCatalogLoaded(term);
}

size_t CatalogManager::GetNumLiveTServersForActiveCluster() {
BlacklistSet blacklist = BlacklistSetFromPB();
Result<size_t> CatalogManager::GetNumLiveTServersForActiveCluster() {
BlacklistSet blacklist = VERIFY_RESULT(BlacklistSetFromPB());
TSDescriptorVector ts_descs;
master_->ts_manager()->GetAllLiveDescriptorsInCluster(&ts_descs, placement_uuid(), blacklist);
auto uuid = VERIFY_RESULT(placement_uuid());
master_->ts_manager()->GetAllLiveDescriptorsInCluster(&ts_descs, uuid, blacklist);
return ts_descs.size();
}

Expand Down
9 changes: 2 additions & 7 deletions src/yb/master/catalog_entity_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -784,21 +784,16 @@ struct PersistentClusterConfigInfo : public Persistent<SysClusterConfigEntryPB,

// This is the in memory representation of the cluster config information serialized proto data,
// using metadata() for CowObject access.
class ClusterConfigInfo : public RefCountedThreadSafe<ClusterConfigInfo>,
public MetadataCowWrapper<PersistentClusterConfigInfo> {
class ClusterConfigInfo : public MetadataCowWrapper<PersistentClusterConfigInfo> {
public:
ClusterConfigInfo() {}
~ClusterConfigInfo() = default;

virtual const std::string& id() const override { return fake_id_; }

private:
friend class RefCountedThreadSafe<ClusterConfigInfo>;
~ClusterConfigInfo() = default;

// We do not use the ID field in the sys_catalog table.
const std::string fake_id_;

DISALLOW_COPY_AND_ASSIGN(ClusterConfigInfo);
};

struct PersistentRedisConfigInfo
Expand Down
14 changes: 11 additions & 3 deletions src/yb/master/catalog_loaders.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,18 @@
#include "yb/master/master_util.h"
#include "yb/master/ysql_transaction_ddl.h"

#include "yb/util/flag_tags.h"
#include "yb/util/status_format.h"
#include "yb/util/status_log.h"

DEFINE_bool(master_ignore_deleted_on_load, true,
"Whether the Master should ignore deleted tables & tablets on restart. "
"This reduces failover time at the expense of garbage data." );

DEFINE_test_flag(uint64, slow_cluster_config_load_secs, 0,
"When set, it pauses load of cluster config during sys catalog load.");
TAG_FLAG(TEST_slow_cluster_config_load_secs, runtime);

namespace yb {
namespace master {

Expand Down Expand Up @@ -413,18 +418,21 @@ Status UDTypeLoader::Visit(const UDTypeId& udtype_id, const SysUDTypeEntryPB& me

Status ClusterConfigLoader::Visit(
const std::string& unused_id, const SysClusterConfigEntryPB& metadata) {
if (FLAGS_TEST_slow_cluster_config_load_secs > 0) {
SleepFor(MonoDelta::FromSeconds(FLAGS_TEST_slow_cluster_config_load_secs));
}
// Debug confirm that there is no cluster_config_ set. This also ensures that this does not
// visit multiple rows. Should update this, if we decide to have multiple IDs set as well.
std::lock_guard<decltype(catalog_manager_->config_mutex_)> config_lock(
catalog_manager_->config_mutex_);
DCHECK(!catalog_manager_->cluster_config_) << "Already have config data!";

// Prepare the config object.
ClusterConfigInfo* config = new ClusterConfigInfo();
std::shared_ptr<ClusterConfigInfo> config = std::make_shared<ClusterConfigInfo>();
{
auto l = config->LockForWrite();
l.mutable_data()->pb.CopyFrom(metadata);



// Update in memory state.
catalog_manager_->cluster_config_ = config;
l.Commit();
Expand Down
Loading

0 comments on commit 79e1f12

Please sign in to comment.