From 8c999b6b32c5a6d70bf5a9936884ccd46f9547e3 Mon Sep 17 00:00:00 2001 From: deeps1991 Date: Wed, 23 Sep 2020 16:21:55 -0700 Subject: [PATCH] =?UTF-8?q?Allow=20setting=20a=20custom=20placement=20poli?= =?UTF-8?q?cy=20per=20table=20through=20yb-admin=5Fcl=E2=80=A6=20(#5368)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Allow setting a custom placement policy per table through yb-admin_cli that overrides the cluster placement policy. * Addressed Vivek and Rahul's comments * Addressed Rahul's comments * Lint changes. Co-authored-by: Rahul Desirazu --- src/yb/client/client-internal.cc | 3 + src/yb/client/table.cc | 4 + src/yb/client/table.h | 4 + src/yb/client/table_alterer.cc | 13 +- src/yb/client/table_alterer.h | 7 +- src/yb/integration-tests/CMakeLists.txt | 1 + .../load_balancer_placement_policy-test.cc | 396 ++++++++++++++++++ src/yb/master/catalog_manager.cc | 117 ++++-- src/yb/master/catalog_manager.h | 11 +- src/yb/master/master.proto | 6 + src/yb/tools/yb-admin-test.cc | 123 ++++++ src/yb/tools/yb-admin_cli.cc | 39 ++ src/yb/tools/yb-admin_client.cc | 63 +++ src/yb/tools/yb-admin_client.h | 5 + 14 files changed, 757 insertions(+), 35 deletions(-) create mode 100644 src/yb/integration-tests/load_balancer_placement_policy-test.cc diff --git a/src/yb/client/client-internal.cc b/src/yb/client/client-internal.cc index 94a083b03079..6beb0c3cf985 100644 --- a/src/yb/client/client-internal.cc +++ b/src/yb/client/client-internal.cc @@ -1344,6 +1344,9 @@ void GetTableSchemaRpc::Finished(const Status& status) { if (resp_.has_index_info()) { info_->index_info.emplace(resp_.index_info()); } + if (resp_.has_replication_info()) { + info_->replication_info.emplace(resp_.replication_info()); + } CHECK_GT(info_->table_id.size(), 0) << "Running against a too-old master"; info_->colocated = resp_.colocated(); } diff --git a/src/yb/client/table.cc b/src/yb/client/table.cc index 3774ee5e76bb..73267c3666e3 100644 --- a/src/yb/client/table.cc +++ b/src/yb/client/table.cc @@ -131,6 +131,10 @@ const bool YBTable::colocated() const { return info_.colocated; } +const boost::optional& YBTable::replication_info() const { + return info_.replication_info; +} + std::string YBTable::ToString() const { return strings::Substitute( "$0 $1 IndexInfo: $2 IndexMap $3", (IsIndex() ? "Index Table" : "Normal Table"), id(), diff --git a/src/yb/client/table.h b/src/yb/client/table.h index d2bdfd58912d..737a14c9db2c 100644 --- a/src/yb/client/table.h +++ b/src/yb/client/table.h @@ -47,6 +47,7 @@ struct YBTableInfo { boost::optional index_info; YBTableType table_type; bool colocated; + boost::optional replication_info; }; // A YBTable represents a table on a particular cluster. It holds the current @@ -101,6 +102,9 @@ class YBTable : public std::enable_shared_from_this { // Is the table colocated? const bool colocated() const; + // Returns the replication info for the table. + const boost::optional& replication_info() const; + std::string ToString() const; //------------------------------------------------------------------------------------------------ // CQL support diff --git a/src/yb/client/table_alterer.cc b/src/yb/client/table_alterer.cc index 9dbb3839652c..43ae694d0c64 100644 --- a/src/yb/client/table_alterer.cc +++ b/src/yb/client/table_alterer.cc @@ -59,6 +59,11 @@ YBTableAlterer* YBTableAlterer::SetTableProperties(const TableProperties& table_ return this; } +YBTableAlterer* YBTableAlterer::replication_info(const master::ReplicationInfoPB& ri) { + replication_info_.emplace(ri); + return this; +} + YBTableAlterer* YBTableAlterer::SetWalRetentionSecs(const uint32_t wal_retention_secs) { wal_retention_secs_ = wal_retention_secs; return this; @@ -97,7 +102,8 @@ Status YBTableAlterer::ToRequest(master::AlterTableRequestPB* req) { return status_; } - if (!rename_to_ && steps_.empty() && !table_properties_ && !wal_retention_secs_) { + if (!rename_to_ && steps_.empty() && !table_properties_ && !wal_retention_secs_ && + !replication_info_) { return STATUS(InvalidArgument, "No alter steps provided"); } @@ -169,6 +175,11 @@ Status YBTableAlterer::ToRequest(master::AlterTableRequestPB* req) { req->set_wal_retention_secs(*wal_retention_secs_); } + if (replication_info_) { + // TODO: Maybe add checks for the sanity of the replication_info. + req->mutable_replication_info()->CopyFrom(replication_info_.get()); + } + return Status::OK(); } diff --git a/src/yb/client/table_alterer.h b/src/yb/client/table_alterer.h index ca7bc39222b3..e65a1b1c684a 100644 --- a/src/yb/client/table_alterer.h +++ b/src/yb/client/table_alterer.h @@ -62,7 +62,7 @@ class YBTableAlterer { YBTableAlterer* SetWalRetentionSecs(const uint32_t wal_retention_secs); - // Set the timeout for the operation. This includes any waiting + // Set the timeout for the operation. This includes any waiting // after the alter has been submitted (i.e if the alter is slow // to be performed on a large table, it may time out and then // later be successful). @@ -73,6 +73,9 @@ class YBTableAlterer { // If not provided, defaults to true. YBTableAlterer* wait(bool wait); + // Set replication info for the table. + YBTableAlterer* replication_info(const master::ReplicationInfoPB& ri); + // Alters the table. // // The return value may indicate an error in the alter operation, or a @@ -111,6 +114,8 @@ class YBTableAlterer { boost::optional wal_retention_secs_; + boost::optional replication_info_; + DISALLOW_COPY_AND_ASSIGN(YBTableAlterer); }; diff --git a/src/yb/integration-tests/CMakeLists.txt b/src/yb/integration-tests/CMakeLists.txt index 04ee9b54ada3..c194fac6c2e6 100644 --- a/src/yb/integration-tests/CMakeLists.txt +++ b/src/yb/integration-tests/CMakeLists.txt @@ -128,6 +128,7 @@ ADD_YB_TEST(log_version-test) ADD_YB_TEST(load_balancer-test) ADD_YB_TEST(load_balancer_multi_table-test) ADD_YB_TEST(load_balancer_respect_affinity-test) +ADD_YB_TEST(load_balancer_placement_policy-test) set(YB_TEST_LINK_LIBS_SAVED ${YB_TEST_LINK_LIBS}) set(YB_TEST_LINK_LIBS ${YB_TEST_LINK_LIBS} cassandra cql_test_util) diff --git a/src/yb/integration-tests/load_balancer_placement_policy-test.cc b/src/yb/integration-tests/load_balancer_placement_policy-test.cc new file mode 100644 index 000000000000..3bdb1d092e78 --- /dev/null +++ b/src/yb/integration-tests/load_balancer_placement_policy-test.cc @@ -0,0 +1,396 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include + +#include "yb/integration-tests/yb_table_test_base.h" + +#include "yb/client/schema.h" +#include "yb/client/table.h" +#include "yb/client/table_creator.h" +#include "yb/client/yb_table_name.h" +#include "yb/gutil/strings/join.h" +#include "yb/integration-tests/cluster_verifier.h" +#include "yb/integration-tests/external_mini_cluster.h" +#include "yb/integration-tests/test_workload.h" +#include "yb/master/master.h" +#include "yb/master/master.proxy.h" +#include "yb/rpc/rpc_controller.h" +#include "yb/tools/yb-admin_client.h" + +using namespace std::literals; + +namespace yb { +namespace integration_tests { + +const auto kDefaultTimeout = 30000ms; + +class LoadBalancerPlacementPolicyTest : public YBTableTestBase { + protected: + void SetUp() override { + YBTableTestBase::SetUp(); + + yb_admin_client_ = std::make_unique( + external_mini_cluster()->GetMasterAddresses(), kDefaultTimeout); + + ASSERT_OK(yb_admin_client_->Init()); + } + + bool use_external_mini_cluster() override { return true; } + + int num_tablets() override { + return 4; + } + + bool enable_ysql() override { + // Do not create the transaction status table. + return false; + } + + void GetLoadOnTservers(const string tablename, + int num_tservers, + vector *const out_load_per_tserver) { + out_load_per_tserver->clear(); + for (int ii = 0; ii < num_tservers; ++ii) { + const int count = ASSERT_RESULT(GetLoadOnTserver( + external_mini_cluster()->tablet_server(ii), tablename)); + out_load_per_tserver->emplace_back(count); + } + } + + Result GetLoadOnTserver(ExternalTabletServer* server, const string tablename) { + auto proxy = VERIFY_RESULT(GetMasterLeaderProxy()); + master::GetTableLocationsRequestPB req; + req.mutable_table()->set_table_name(tablename); + req.mutable_table()->mutable_namespace_()->set_name(table_name().namespace_name()); + master::GetTableLocationsResponsePB resp; + + rpc::RpcController rpc; + rpc.set_timeout(kDefaultTimeout); + RETURN_NOT_OK(proxy->GetTableLocations(req, &resp, &rpc)); + + uint32_t count = 0; + std::vector replicas; + for (const auto& loc : resp.tablet_locations()) { + for (const auto& replica : loc.replicas()) { + if (replica.ts_info().permanent_uuid() == server->instance_id().permanent_uuid()) { + replicas.push_back(loc.tablet_id()); + count++; + } + } + } + LOG(INFO) << Format("For ts $0, table name $1 tablet count $2", + server->instance_id().permanent_uuid(), tablename, count); + return count; + } + + Result> GetMasterLeaderProxy() { + int idx; + RETURN_NOT_OK(external_mini_cluster()->GetLeaderMasterIndex(&idx)); + return external_mini_cluster()->master_proxy(idx); + } + + void CustomizeExternalMiniCluster(ExternalMiniClusterOptions* opts) override { + opts->extra_tserver_flags.push_back("--placement_cloud=c"); + opts->extra_tserver_flags.push_back("--placement_region=r"); + opts->extra_tserver_flags.push_back("--placement_zone=z${index}"); + opts->extra_master_flags.push_back("--load_balancer_skip_leader_as_remove_victim=false"); + opts->extra_master_flags.push_back("--tserver_unresponsive_timeout_ms=5000"); + } + + void WaitForLoadBalancer() { + ASSERT_OK(WaitFor([&]() -> Result { + bool is_idle = VERIFY_RESULT(client_->IsLoadBalancerIdle()); + return !is_idle; + }, kDefaultTimeout * 2, "IsLoadBalancerActive")); + + ASSERT_OK(WaitFor([&]() -> Result { + return client_->IsLoadBalancerIdle(); + }, kDefaultTimeout * 4, "IsLoadBalancerIdle")); + } + + void AddNewTserverToZone( + const string& zone, + const int expected_num_tservers, + const string& placement_uuid = "") { + + std::vector extra_opts; + extra_opts.push_back("--placement_cloud=c"); + extra_opts.push_back("--placement_region=r"); + extra_opts.push_back("--placement_zone=" + zone); + + if (!placement_uuid.empty()) { + extra_opts.push_back("--placement_uuid=" + placement_uuid); + } + + ASSERT_OK(external_mini_cluster()->AddTabletServer(true, extra_opts)); + ASSERT_OK(external_mini_cluster()->WaitForTabletServerCount(expected_num_tservers, + kDefaultTimeout)); + } + + std::unique_ptr yb_admin_client_; +}; + +TEST_F(LoadBalancerPlacementPolicyTest, CreateTableWithPlacementPolicyTest) { + // Set cluster placement policy. + ASSERT_OK(yb_admin_client_->ModifyPlacementInfo("c.r.z0,c.r.z1,c.r.z2", 3, "")); + + const string& create_custom_policy_table = "creation-placement-test"; + const yb::client::YBTableName placement_table( + YQL_DATABASE_CQL, table_name().namespace_name(), create_custom_policy_table); + + yb::client::YBSchemaBuilder b; + yb::client::YBSchema schema; + b.AddColumn("k")->Type(BINARY)->NotNull()->HashPrimaryKey(); + ASSERT_OK(b.Build(&schema)); + + // Set placement policy for the new table that is different from the cluster placement policy. + master::ReplicationInfoPB replication_info; + replication_info.mutable_live_replicas()->set_num_replicas(2); + auto* placement_block = replication_info.mutable_live_replicas()->add_placement_blocks(); + auto* cloud_info = placement_block->mutable_cloud_info(); + cloud_info->set_placement_cloud("c"); + cloud_info->set_placement_region("r"); + cloud_info->set_placement_zone("z1"); + placement_block->set_min_num_replicas(1); + + placement_block = replication_info.mutable_live_replicas()->add_placement_blocks(); + cloud_info = placement_block->mutable_cloud_info(); + cloud_info->set_placement_cloud("c"); + cloud_info->set_placement_region("r"); + cloud_info->set_placement_zone("z2"); + placement_block->set_min_num_replicas(1); + + ASSERT_OK(NewTableCreator()->table_name(placement_table).schema(&schema).replication_info( + replication_info).Create()); + + vector counts_per_ts; + int64 num_tservers = num_tablet_servers(); + GetLoadOnTservers(create_custom_policy_table, num_tservers, &counts_per_ts); + // Verify that the tserver in zone0 does not have any tablets assigned to it. + ASSERT_EQ(counts_per_ts[0], 0); + // Verify that the tservers in z1 and z2 have tablets assigned to them. + ASSERT_EQ(counts_per_ts[1], 4); + ASSERT_EQ(counts_per_ts[2], 4); + + // Verify that modifying the placement info for a table with custom placement + // policy works as expected. + ASSERT_OK(yb_admin_client_->ModifyTablePlacementInfo( + placement_table, "c.r.z0,c.r.z1,c.r.z2", 3, "")); + WaitForLoadBalancer(); + + // The replication factor increased to 3, and the placement info now has all 3 zones. + // Thus, all tservers should have 4 tablets. + GetLoadOnTservers(create_custom_policy_table, num_tservers, &counts_per_ts); + for (int ii = 0; ii < 3; ++ii) { + ASSERT_EQ(counts_per_ts[ii], 4); + } +} + +TEST_F(LoadBalancerPlacementPolicyTest, PlacementPolicyTest) { + // Set cluster placement policy. + ASSERT_OK(yb_admin_client_->ModifyPlacementInfo("c.r.z0,c.r.z1,c.r.z2", 3, "")); + + // Add a new tserver to zone 1. + int num_tservers = num_tablet_servers() + 1; + AddNewTserverToZone("z1", num_tservers); + + WaitForLoadBalancer(); + + // Create another table for which we will set custom placement info. + const string& custom_policy_table = "placement-test"; + const yb::client::YBTableName placement_table( + YQL_DATABASE_CQL, table_name().namespace_name(), custom_policy_table); + ASSERT_OK(client_->CreateNamespaceIfNotExists( + placement_table.namespace_name(), + placement_table.namespace_type())); + + yb::client::YBSchemaBuilder b; + yb::client::YBSchema schema; + b.AddColumn("k")->Type(BINARY)->NotNull()->HashPrimaryKey(); + b.AddColumn("v")->Type(BINARY)->NotNull(); + ASSERT_OK(b.Build(&schema)); + + ASSERT_OK(NewTableCreator()->table_name(placement_table).schema(&schema).Create()); + + WaitForLoadBalancer(); + + // Modify the placement info for the table. + ASSERT_OK(yb_admin_client_->ModifyTablePlacementInfo(placement_table, "c.r.z1,c.r.z2", 3, "")); + + WaitForLoadBalancer(); + + // Test 1: Verify placement of tablets for the table with modified placement info. + vector counts_per_ts; + GetLoadOnTservers(custom_policy_table, num_tservers, &counts_per_ts); + // ts0 in c.r.z0 should have no tablets in it. + ASSERT_EQ(counts_per_ts[0], 0); + // The other tablet servers should have tablets spread equally. + ASSERT_EQ(counts_per_ts[1], counts_per_ts[2]); + ASSERT_EQ(counts_per_ts[2], counts_per_ts[3]); + + // The table with cluster placement policy should have tablets spread across all tservers. + GetLoadOnTservers(table_name().table_name(), num_tservers, &counts_per_ts); + for (int ii = 0; ii < num_tservers; ++ii) { + ASSERT_GT(counts_per_ts[ii], 0); + } + + // Test 2: Verify that custom placement info is honored when tservers are added. + // Add two new tservers in both z0 and z2. + ++num_tservers; + AddNewTserverToZone("z0", num_tservers); + + ++num_tservers; + AddNewTserverToZone("z2", num_tservers); + + WaitForLoadBalancer(); + + GetLoadOnTservers(custom_policy_table, num_tservers, &counts_per_ts); + for (int ii = 0; ii < num_tservers; ++ii) { + if (ii == 0 || ii == 4) { + // The table with custom policy should have no tablets in z0, i.e. ts0 and ts4. + ASSERT_EQ(counts_per_ts[ii], 0); + continue; + } + // The other tablet servers should have tablets in them. + ASSERT_GT(counts_per_ts[ii], 0); + } + + // Test 3: Verify that custom placement info is honored when tservers are removed. + ASSERT_OK(external_mini_cluster()->AddTServerToBlacklist( + external_mini_cluster()->master(), + external_mini_cluster()->tablet_server(4))); + ASSERT_OK(external_mini_cluster()->AddTServerToBlacklist( + external_mini_cluster()->master(), + external_mini_cluster()->tablet_server(5))); + WaitForLoadBalancer(); + + num_tservers -= 2; + GetLoadOnTservers(custom_policy_table, num_tservers, &counts_per_ts); + // ts0 in c.r.z0 should have no tablets in it. + ASSERT_EQ(counts_per_ts[0], 0); + // The other tablet servers should have tablets spread equally. + ASSERT_EQ(counts_per_ts[1], counts_per_ts[2]); + ASSERT_EQ(counts_per_ts[2], counts_per_ts[3]); + + // The table with cluster placement policy should continue to have tablets spread across all + // tservers. + GetLoadOnTservers(table_name().table_name(), num_tservers, &counts_per_ts); + for (int ii = 0; ii < num_tservers; ++ii) { + ASSERT_GT(counts_per_ts[ii], 0); + } +} + +TEST_F(LoadBalancerPlacementPolicyTest, AlterPlacementDataConsistencyTest) { + // Set cluster placement policy. + ASSERT_OK(yb_admin_client_->ModifyPlacementInfo("c.r.z0,c.r.z1", 2, "")); + + // Start workload on a table. + const string& table = "placement-data-consistency-test"; + const yb::client::YBTableName placement_table( + YQL_DATABASE_CQL, table_name().namespace_name(), table); + + TestWorkload workload(external_mini_cluster()); + workload.set_table_name(placement_table); + workload.Setup(); + workload.Start(); + + // Change its placement policy such that it now has additional replicas spanning additional + // tservers. + ASSERT_OK(yb_admin_client_->ModifyTablePlacementInfo( + placement_table, "c.r.z0,c.r.z1,c.r.z2", 3, "")); + WaitForLoadBalancer(); + + // Verify that the placement policy is honored. + vector counts_per_ts; + GetLoadOnTservers(table, num_tablet_servers(), &counts_per_ts); + for (int ii = 0; ii < 3; ++ii) { + ASSERT_EQ(counts_per_ts[ii], 1); + } + + // Change placement policy such that it now spans lesser replicas spanning fewer tservers. + ASSERT_OK(yb_admin_client_->ModifyTablePlacementInfo(placement_table, "c.r.z0", 1, "")); + WaitForLoadBalancer(); + + // Verify that placement policy is honored. + GetLoadOnTservers(table, num_tablet_servers(), &counts_per_ts); + // The table is RF1 and confined to zone 0. Ts0 should have 1 tablet. + // The other two tablet servers should not have any tablets. + ASSERT_EQ(counts_per_ts[0], 1); + ASSERT_EQ(counts_per_ts[1], 0); + ASSERT_EQ(counts_per_ts[2], 0); + + // Verify that the data inserted is still sane. + workload.StopAndJoin(); + int rows_inserted = workload.rows_inserted(); + LOG(INFO) << "Number of rows inserted: " << rows_inserted; + + // Verify that number of rows is as expected. + ClusterVerifier cluster_verifier(external_mini_cluster()); + ASSERT_NO_FATALS(cluster_verifier.CheckCluster()); + ASSERT_NO_FATALS(cluster_verifier.CheckRowCount( + placement_table, ClusterVerifier::EXACTLY, rows_inserted)); +} + +TEST_F(LoadBalancerPlacementPolicyTest, ModifyPlacementUUIDTest) { + // Set cluster placement policy. + ASSERT_OK(yb_admin_client_->ModifyPlacementInfo("c.r.z0,c.r.z1,c.r.z2", 3, "")); + + // Add 2 tservers with custom placement uuid. + int num_tservers = num_tablet_servers() + 1; + const string& random_placement_uuid = "19dfa091-2b53-434f-b8dc-97280a5f8831"; + AddNewTserverToZone("z1", num_tservers, random_placement_uuid); + AddNewTserverToZone("z2", ++num_tservers, random_placement_uuid); + + vector counts_per_ts; + GetLoadOnTservers(table_name().table_name(), num_tservers, &counts_per_ts); + + // The first 3 tservers should have equal number of tablets allocated to them, but the new + // tservers should not. + for (int ii = 0; ii < 3; ++ii) { + ASSERT_EQ(counts_per_ts[ii], 4); + } + ASSERT_EQ(counts_per_ts[3], 0); + ASSERT_EQ(counts_per_ts[4], 0); + + // Now there are 2 tservers with custom placement_uuid and 3 tservers with default placement_uuid. + // Modify the cluster config to have new placement_uuid matching the new tservers. + ASSERT_OK(yb_admin_client_->ModifyPlacementInfo( + "c.r.z0,c.r.z1,c.r.z2", 2, random_placement_uuid)); + + // Change the table placement policy and verify that the change reflected. + ASSERT_OK(yb_admin_client_->ModifyTablePlacementInfo( + table_name(), "c.r.z1,c.r.z2", 2, random_placement_uuid)); + WaitForLoadBalancer(); + + // There must now be tablets on the new tservers. + GetLoadOnTservers(table_name().table_name(), num_tservers, &counts_per_ts); + ASSERT_EQ(counts_per_ts[3], 4); + ASSERT_EQ(counts_per_ts[4], 4); + + // Modify the placement policy with different zones and replication factor but with same + // placement uuid. + ASSERT_OK(yb_admin_client_->ModifyTablePlacementInfo( + table_name(), "c.r.z2", 1, random_placement_uuid)); + WaitForLoadBalancer(); + + GetLoadOnTservers(table_name().table_name(), num_tservers, &counts_per_ts); + // TS3 belongs to zone1 and will have 0 tablets whereas since TS4 is in zone2 it should have 4 + // tablets allotted to it. + ASSERT_EQ(counts_per_ts[3], 0); + ASSERT_EQ(counts_per_ts[4], 4); + +} + +} // namespace integration_tests +} // namespace yb diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index e57876c4c435..c4dffe66a317 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -1468,17 +1468,65 @@ Status CatalogManager::AbortTableCreation(TableInfo* table, return CheckIfNoLongerLeaderAndSetupError(s, resp); } -Status CatalogManager::ValidateTableReplicationInfo(const ReplicationInfoPB& replication_info) { - // TODO(bogdan): add the actual subset rules, instead of just erroring out as not supported. +Result CatalogManager::ResolveReplicationInfo( + const ReplicationInfoPB& table_replication_info) { + + if (IsReplicationInfoSet(table_replication_info)) { + // The table has custom replication info set for it, return it if valid. + RETURN_NOT_OK(ValidateTableReplicationInfo(table_replication_info)); + return table_replication_info; + } + + // Table level replication info not set. Return cluster level + // replication info. + auto l = cluster_config_->LockForRead(); + return l->data().pb.replication_info(); +} + +bool CatalogManager::IsReplicationInfoSet(const ReplicationInfoPB& replication_info) { const auto& live_placement_info = replication_info.live_replicas(); if (!(live_placement_info.placement_blocks().empty() && live_placement_info.num_replicas() <= 0 && live_placement_info.placement_uuid().empty()) || !replication_info.read_replicas().empty() || !replication_info.affinitized_leaders().empty()) { - return STATUS( - InvalidArgument, - "Unsupported: cannot set table level replication info yet."); + + return true; + } + return false; +} + +Status CatalogManager::ValidateTableReplicationInfo(const ReplicationInfoPB& replication_info) { + if (!IsReplicationInfoSet(replication_info)) { + return STATUS(InvalidArgument, "No replication info set."); + } + // We don't support setting any other fields other than live replica placements for now. + if (!replication_info.read_replicas().empty() || + !replication_info.affinitized_leaders().empty()) { + + return STATUS(InvalidArgument, "Only live placement info can be set for table " + "level replication info."); + } + // Today we support setting table level replication info only in clusters where read replica + // placements is not set. Return error if the cluster has read replica placements set. + auto l = cluster_config_->LockForRead(); + const ReplicationInfoPB& cluster_replication_info = l->data().pb.replication_info(); + if (!cluster_replication_info.read_replicas().empty() || + !cluster_replication_info.affinitized_leaders().empty()) { + + return STATUS(InvalidArgument, "Setting table level replication info is not supported " + "for clusters with read replica placements"); + } + // If the replication info has placement_uuid set, verify that it matches the cluster + // placement_uuid. + if (replication_info.live_replicas().placement_uuid().empty()) { + return Status::OK(); + } + if (replication_info.live_replicas().placement_uuid() != + cluster_replication_info.live_replicas().placement_uuid()) { + + return STATUS(InvalidArgument, "Placement uuid for table level replication info " + "must match that of the cluster's live placement info."); } return Status::OK(); } @@ -2130,12 +2178,17 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req, } } - // Get cluster level placement info. - ReplicationInfoPB replication_info; - { - auto l = cluster_config_->LockForRead(); - replication_info = l->data().pb.replication_info(); + // Verify that custom placement policy has not been specified for colocated table. + if (colocated && IsReplicationInfoSet(req.replication_info())) { + Status s = STATUS(InvalidArgument, "Custom placement policy should not be set for " + "colocated tables"); + return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_TABLE_REPLICATION_INFO, s); } + + // Get placement info. + const ReplicationInfoPB& replication_info = VERIFY_RESULT( + ResolveReplicationInfo(req.replication_info())); + // Calculate number of tablets to be used. int num_tablets = req.schema().table_properties().num_tablets(); if (num_tablets <= 0) { @@ -2196,12 +2249,6 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req, } } - // Validate the table placement rules are a subset of the cluster ones. - s = ValidateTableReplicationInfo(req.replication_info()); - if (PREDICT_FALSE(!s.ok())) { - return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_SCHEMA, s); - } - // For index table, populate the index info. IndexInfoPB index_info; @@ -2919,7 +2966,9 @@ scoped_refptr CatalogManager::CreateTableInfo(const CreateTableReques metadata->set_namespace_name(namespace_name); metadata->set_version(0); metadata->set_next_column_id(ColumnId(schema.max_col_id() + 1)); - // TODO(bogdan): add back in replication_info once we allow overrides! + if (req.has_replication_info()) { + metadata->mutable_replication_info()->CopyFrom(req.replication_info()); + } // Use the Schema object passed in, since it has the column IDs already assigned, // whereas the user request PB does not. SchemaToPB(schema, metadata->mutable_schema()); @@ -3827,6 +3876,22 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, has_changes = true; } + // Check if there has been any changes to the placement policies for this table. + if (req->has_replication_info()) { + // If this is a colocated table, it does not make sense to set placement + // policy for this table, as the tablet associated with it is shared by + // multiple tables. + if (table->colocated()) { + const Status s = STATUS(InvalidArgument, + "Placement policy cannot be altered for a colocated table"); + return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, s); + } + // Validate table replication info. + RETURN_NOT_OK(ValidateTableReplicationInfo(req->replication_info())); + table_pb.mutable_replication_info()->CopyFrom(req->replication_info()); + has_changes = true; + } + // TODO(hector): Simplify the AlterSchema workflow to avoid doing the same checks on every layer // this request goes through: https://github.com/YugaByte/yugabyte-db/issues/1882. if (req->has_wal_retention_secs()) { @@ -4045,7 +4110,9 @@ Status CatalogManager::GetTableSchema(const GetTableSchemaRequestPB* req, VLOG(1) << " Returning pb.schema() "; } resp->mutable_partition_schema()->CopyFrom(l->data().pb.partition_schema()); - // TODO(bogdan): add back in replication_info once we allow overrides! + if (IsReplicationInfoSet(l->data().pb.replication_info())) { + resp->mutable_replication_info()->CopyFrom(l->data().pb.replication_info()); + } resp->set_create_table_done(!table->IsCreateInProgress()); resp->set_table_type(table->metadata().state().pb.table_type()); resp->mutable_identifier()->set_table_name(l->data().pb.name()); @@ -7002,15 +7069,8 @@ Status CatalogManager::SelectReplicasForTablet(const TSDescriptorVector& ts_desc tablet->tablet_id()); } - // Validate that we do not have placement blocks in both cluster and table data. - RETURN_NOT_OK(ValidateTableReplicationInfo(table_guard->data().pb.replication_info())); - - // Default to the cluster placement object. - ReplicationInfoPB replication_info; - { - auto l = cluster_config_->LockForRead(); - replication_info = l->data().pb.replication_info(); - } + const ReplicationInfoPB& replication_info = VERIFY_RESULT(ResolveReplicationInfo( + table_guard->data().pb.replication_info())); // Select the set of replicas for the tablet. ConsensusStatePB* cstate = tablet->mutable_metadata()->mutable_dirty() @@ -7020,9 +7080,6 @@ Status CatalogManager::SelectReplicasForTablet(const TSDescriptorVector& ts_desc consensus::RaftConfigPB *config = cstate->mutable_config(); config->set_opid_index(consensus::kInvalidOpIdIndex); - // TODO: we do this defaulting to cluster if no table data in two places, should refactor and - // have a centralized getter, that will ultimately do the subsetting as well. - Status s = HandlePlacementUsingReplicationInfo(replication_info, ts_descs, config); if (!s.ok()) { return s; diff --git a/src/yb/master/catalog_manager.h b/src/yb/master/catalog_manager.h index 8c94339d32c0..9a67fbc877ad 100644 --- a/src/yb/master/catalog_manager.h +++ b/src/yb/master/catalog_manager.h @@ -1060,9 +1060,14 @@ class CatalogManager : public tserver::TabletPeerLookupIf { const Status& s, CreateTableResponsePB* resp); - // Validates that the passed-in table replication information respects the overall cluster level - // configuration. This should essentially not be more broader reaching than the cluster. As an - // example, if the cluster is confined to AWS, you cannot have tables in GCE. + // Returns 'table_replication_info' itself if set. Otherwise returns the cluster level + // replication info. + Result ResolveReplicationInfo(const ReplicationInfoPB& table_replication_info); + + // Returns whether 'replication_info' has any relevant fields set. + bool IsReplicationInfoSet(const ReplicationInfoPB& replication_info); + + // Validates that 'replication_info' for a table has supported fields set. CHECKED_STATUS ValidateTableReplicationInfo(const ReplicationInfoPB& replication_info); // Report metrics. diff --git a/src/yb/master/master.proto b/src/yb/master/master.proto index 8aecd81ac73f..8a5de5513d12 100644 --- a/src/yb/master/master.proto +++ b/src/yb/master/master.proto @@ -130,6 +130,9 @@ message MasterErrorPB { LOAD_BALANCER_RECENTLY_ACTIVE = 33; INTERNAL_ERROR = 34; + + // Client set some fields in the table replication info incorrectly. + INVALID_TABLE_REPLICATION_INFO = 35; } // The error code. @@ -1066,6 +1069,9 @@ message AlterTableRequestPB { // request. // This is useful to trigger index permissions update. optional bool force_send_alter_request = 7 [default = false]; + + // Replication information for this table. + optional ReplicationInfoPB replication_info = 8; } message AlterTableResponsePB { diff --git a/src/yb/tools/yb-admin-test.cc b/src/yb/tools/yb-admin-test.cc index 25c010bfad02..07b27fc46547 100644 --- a/src/yb/tools/yb-admin-test.cc +++ b/src/yb/tools/yb-admin-test.cc @@ -556,5 +556,128 @@ TEST_F(AdminCliTest, TestGetClusterLoadBalancerState) { ASSERT_NE(output.find("ENABLED"), std::string::npos); } +TEST_F(AdminCliTest, TestModifyTablePlacementPolicy) { + // Start a cluster with 3 tservers, each corresponding to a different zone. + FLAGS_num_tablet_servers = 3; + FLAGS_num_replicas = 2; + std::vector master_flags; + master_flags.push_back("--enable_load_balancing=true"); + master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false"); + std::vector ts_flags; + ts_flags.push_back("--placement_cloud=c"); + ts_flags.push_back("--placement_region=r"); + ts_flags.push_back("--placement_zone=z${index}"); + BuildAndStart(ts_flags, master_flags); + + const std::string& master_address = ToString(cluster_->master()->bound_rpc_addr()); + auto client = ASSERT_RESULT(YBClientBuilder() + .add_master_server_addr(master_address) + .Build()); + + // Modify the cluster placement policy to consist of 2 zones. + std::string output; + ASSERT_OK(Subprocess::Call(ToStringVector(GetAdminToolPath(), + "-master_addresses", master_address, "modify_placement_info", "c.r.z0,c.r.z1", 2, ""), + &output)); + + // Create a new table. + const auto extra_table = YBTableName(YQLDatabase::YQL_DATABASE_CQL, + kTableName.namespace_name(), + "extra-table"); + // Start a workload. + TestWorkload workload(cluster_.get()); + workload.set_table_name(extra_table); + workload.set_timeout_allowed(true); + workload.Setup(); + workload.Start(); + + // Verify that the table has no custom placement policy set for it. + std::shared_ptr table; + ASSERT_OK(client->OpenTable(extra_table, &table)); + ASSERT_FALSE(table->replication_info()); + + // Use yb-admin_cli to set a custom placement policy different from that of + // the cluster placement policy for the new table. + ASSERT_OK(Subprocess::Call(ToStringVector(GetAdminToolPath(), + "-master_addresses", master_address, "modify_table_placement_info", + kTableName.namespace_name(), "extra-table", "c.r.z0,c.r.z1,c.r.z2", 3, ""), + &output)); + + // Verify that changing the placement _uuid for a table fails if the + // placement_uuid does not match the cluster live placement_uuid. + const string& random_placement_uuid = "19dfa091-2b53-434f-b8dc-97280a5f8831"; + ASSERT_NOK(Subprocess::Call(ToStringVector(GetAdminToolPath(), + "-master_addresses", master_address, "modify_table_placement_info", + kTableName.namespace_name(), "extra-table", "c.r.z0,c.r.z1,c.r.z2", 3, random_placement_uuid), + &output)); + + ASSERT_OK(client->OpenTable(extra_table, &table)); + ASSERT_TRUE(table->replication_info().get().live_replicas().placement_uuid().empty()); + + // Fetch the placement policy for the table and verify that it matches + // the custom info set previously. + ASSERT_OK(client->OpenTable(extra_table, &table)); + vector found_zones; + found_zones.assign(3, false); + ASSERT_EQ(table->replication_info().get().live_replicas().placement_blocks_size(), 3); + for (int ii = 0; ii < 3; ++ii) { + auto pb = table->replication_info().get().live_replicas().placement_blocks(ii).cloud_info(); + ASSERT_EQ(pb.placement_cloud(), "c"); + ASSERT_EQ(pb.placement_region(), "r"); + if (pb.placement_zone() == "z0") { + found_zones[0] = true; + } else if (pb.placement_zone() == "z1") { + found_zones[1] = true; + } else { + ASSERT_EQ(pb.placement_zone(), "z2"); + found_zones[2] = true; + } + } + for (const bool found : found_zones) { + ASSERT_TRUE(found); + } + + // Perform the same test, but use the table-id instead of table name to set the + // custom placement policy. + const string& id = table->id(); + ASSERT_OK(Subprocess::Call(ToStringVector(GetAdminToolPath(), + "-master_addresses", master_address, "modify_table_placement_info", + Format("tableid.$0", id), "c.r.z1", 1, ""), + &output)); + + // Verify that changing the placement _uuid for a table fails if the + // placement_uuid does not match the cluster live placement_uuid. + ASSERT_NOK(Subprocess::Call(ToStringVector(GetAdminToolPath(), + "-master_addresses", master_address, "modify_table_placement_info", + Format("tableid.$0", id), "c.r.z1", 1, random_placement_uuid), + &output)); + + ASSERT_OK(client->OpenTable(extra_table, &table)); + ASSERT_TRUE(table->replication_info().get().live_replicas().placement_uuid().empty()); + + // Fetch the placement policy for the table and verify that it matches + // the custom info set previously. + ASSERT_OK(client->OpenTable(extra_table, &table)); + ASSERT_EQ(table->replication_info().get().live_replicas().placement_blocks_size(), 1); + auto pb = table->replication_info().get().live_replicas().placement_blocks(0).cloud_info(); + ASSERT_EQ(pb.placement_cloud(), "c"); + ASSERT_EQ(pb.placement_region(), "r"); + ASSERT_EQ(pb.placement_zone(), "z1"); + + // Stop the workload. + workload.StopAndJoin(); + int rows_inserted = workload.rows_inserted(); + LOG(INFO) << "Number of rows inserted: " << rows_inserted; + + sleep(5); + + // Verify that there was no data loss. + ClusterVerifier cluster_verifier(cluster_.get()); + ASSERT_NO_FATALS(cluster_verifier.CheckCluster()); + ASSERT_NO_FATALS(cluster_verifier.CheckRowCount( + extra_table, ClusterVerifier::EXACTLY, rows_inserted)); +} + + } // namespace tools } // namespace yb diff --git a/src/yb/tools/yb-admin_cli.cc b/src/yb/tools/yb-admin_cli.cc index 0262d73ae1db..4ee684fc652a 100644 --- a/src/yb/tools/yb-admin_cli.cc +++ b/src/yb/tools/yb-admin_cli.cc @@ -344,6 +344,45 @@ void ClusterAdminCli::RegisterCommandHandlers(ClusterAdminClientClass* client) { return Status::OK(); }); + static const auto kTableName = "<( )|tableid.>"; + static const auto kPlacementInfo = "placement_info"; + static const auto kReplicationFactor = "replication_factor"; + static const auto kPlacementUuid = "placement_uuid"; + + Register( + "modify_table_placement_info", + Format(" [$0] [$1] [$2] [$3]", kTableName, kPlacementInfo, kReplicationFactor, + kPlacementUuid), + [client](const CLIArguments& args) -> Status { + if (args.size() < 5 || args.size() > 7) { + return ClusterAdminCli::kInvalidArguments; + } + std::string placement_info; + int rf = -1; + std::string placement_uuid; + const auto table_name = VERIFY_RESULT(ResolveSingleTableName( + client, args.begin() + 2, args.end(), + [&placement_info, &rf, &placement_uuid]( + auto i, const auto& end) -> Status { + // Get placement info. + placement_info = *i; + i = std::next(i); + // Get replication factor. + rf = VERIFY_RESULT(CheckedStoi(*i)); + i = std::next(i); + // Get optional placement uuid. + if (i != end) { + placement_uuid = *i; + } + return Status::OK(); + } + )); + RETURN_NOT_OK_PREPEND( + client->ModifyTablePlacementInfo(table_name, placement_info, rf, placement_uuid), + Substitute("Unable to modify placement info for table $0", table_name.ToString())); + return Status::OK(); + }); + Register( "modify_placement_info", " [placement_uuid]", [client](const CLIArguments& args) -> Status { diff --git a/src/yb/tools/yb-admin_client.cc b/src/yb/tools/yb-admin_client.cc index e0c6e14b8bd3..cf6ebd65209e 100644 --- a/src/yb/tools/yb-admin_client.cc +++ b/src/yb/tools/yb-admin_client.cc @@ -48,6 +48,7 @@ #include "yb/common/redis_constants_common.h" #include "yb/common/wire_protocol.h" #include "yb/client/client.h" +#include "yb/client/table_alterer.h" #include "yb/client/table_creator.h" #include "yb/master/master.pb.h" #include "yb/master/master_error.h" @@ -1574,6 +1575,68 @@ Status ClusterAdminClient::FillPlacementInfo( return Status::OK(); } +Status ClusterAdminClient::ModifyTablePlacementInfo( + const YBTableName& table_name, const std::string& placement_info, int replication_factor, + const std::string& optional_uuid) { + + std::vector placement_info_split = strings::Split( + placement_info, ",", strings::SkipEmpty()); + if (placement_info_split.size() < 1) { + return STATUS(InvalidCommand, "Table placement config must be a list of " + "placement infos seperated by commas. " + "Format: 'cloud1.region1.zone1,cloud2.region2.zone2,cloud3.region3.zone3 ..." + + std::to_string(placement_info_split.size())); + } + + master::PlacementInfoPB* live_replicas = new master::PlacementInfoPB; + live_replicas->set_num_replicas(replication_factor); + // Iterate over the placement blocks of the placementInfo structure. + for (int iter = 0; iter < placement_info_split.size(); iter++) { + std::vector block = strings::Split(placement_info_split[iter], ".", + strings::SkipEmpty()); + if (block.size() != 3) { + return STATUS(InvalidCommand, "Each placement info must have exactly 3 values seperated" + "by dots that denote cloud, region and zone. Block: " + placement_info_split[iter] + + " is invalid"); + } + auto pb = live_replicas->add_placement_blocks(); + pb->mutable_cloud_info()->set_placement_cloud(block[0]); + pb->mutable_cloud_info()->set_placement_region(block[1]); + pb->mutable_cloud_info()->set_placement_zone(block[2]); + // TODO: Should this also be passed in as input? + pb->set_min_num_replicas(1); + } + + if (!optional_uuid.empty()) { + // If we have an optional uuid, set it. + live_replicas->set_placement_uuid(optional_uuid); + } + + master::ReplicationInfoPB replication_info; + // Merge the obtained info with the existing table replication info. + std::shared_ptr table; + RETURN_NOT_OK_PREPEND(yb_client_->OpenTable(table_name, &table), + "Fetching table schema failed!"); + + // If it does not exist, fetch the cluster replication info. + if (!table->replication_info()) { + auto resp_cluster_config = VERIFY_RESULT(GetMasterClusterConfig()); + master::SysClusterConfigEntryPB* sys_cluster_config_entry = + resp_cluster_config.mutable_cluster_config(); + replication_info.CopyFrom(sys_cluster_config_entry->replication_info()); + } else { + // Table replication info exists, copy it over. + replication_info.CopyFrom(table->replication_info().get()); + } + + // Put in the new live placement info. + replication_info.set_allocated_live_replicas(live_replicas); + + std::unique_ptr table_alterer( + yb_client_->NewTableAlterer(table_name)); + return table_alterer->replication_info(replication_info)->Alter(); +} + Status ClusterAdminClient::ModifyPlacementInfo( std::string placement_info, int replication_factor, const std::string& optional_uuid) { diff --git a/src/yb/tools/yb-admin_client.h b/src/yb/tools/yb-admin_client.h index b0e9f3eb77aa..9250ce698ceb 100644 --- a/src/yb/tools/yb-admin_client.h +++ b/src/yb/tools/yb-admin_client.h @@ -193,6 +193,11 @@ class ClusterAdminClient { int timeout_secs, bool is_compaction); + CHECKED_STATUS ModifyTablePlacementInfo(const client::YBTableName& table_name, + const std::string& placement_info, + int replication_factor, + const std::string& optional_uuid); + CHECKED_STATUS ModifyPlacementInfo(std::string placement_infos, int replication_factor, const std::string& optional_uuid);