diff --git a/include/dsn/utility/utils.h b/include/dsn/utility/utils.h index fa7085c124..55beb1adaa 100644 --- a/include/dsn/utility/utils.h +++ b/include/dsn/utility/utils.h @@ -108,5 +108,28 @@ bool hostname(const dsn::rpc_address &address, std::string *hostname_result); // valid_ip_network_order -> return TRUE && hostname_result=hostname | // invalid_ip_network_order -> return FALSE bool hostname_from_ip(uint32_t ip, std::string *hostname_result); + +template +std::multimap flip_map(const std::map &source) +{ + std::multimap target; + std::transform(source.begin(), + source.end(), + std::inserter(target, target.begin()), + [](const std::pair &p) { return std::pair(p.second, p.first); }); + return target; +} + +template +std::set get_intersection(const std::set &set1, const std::set &set2) +{ + std::set intersection; + std::set_intersection(set1.begin(), + set1.end(), + set2.begin(), + set2.end(), + std::inserter(intersection, intersection.begin())); + return intersection; +} } // namespace utils } // namespace dsn diff --git a/src/meta/greedy_load_balancer.cpp b/src/meta/greedy_load_balancer.cpp index 072c3011c9..af0b808815 100644 --- a/src/meta/greedy_load_balancer.cpp +++ b/src/meta/greedy_load_balancer.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include "greedy_load_balancer.h" #include "meta_data.h" @@ -39,18 +40,24 @@ namespace replication { DSN_DEFINE_bool("meta_server", balance_cluster, false, "whether to enable cluster balancer"); DSN_TAG_VARIABLE(balance_cluster, FT_MUTABLE); -uint32_t get_count(const node_state &ns, cluster_balance_type type, int32_t app_id) +DSN_DEFINE_uint32("meta_server", + balance_op_count_per_round, + 10, + "balance operation count per round for cluster balancer"); +DSN_TAG_VARIABLE(balance_op_count_per_round, FT_MUTABLE); + +uint32_t get_partition_count(const node_state &ns, cluster_balance_type type, int32_t app_id) { unsigned count = 0; switch (type) { - case cluster_balance_type::Secondary: + case cluster_balance_type::COPY_SECONDARY: if (app_id > 0) { count = ns.partition_count(app_id) - ns.primary_count(app_id); } else { count = ns.partition_count() - ns.primary_count(); } break; - case cluster_balance_type::Primary: + case cluster_balance_type::COPY_PRIMARY: if (app_id > 0) { count = ns.primary_count(app_id); } else { @@ -77,6 +84,23 @@ uint32_t get_skew(const std::map &count_map) return max - min; } +void get_min_max_set(const std::map &node_count_map, + /*out*/ std::set &min_set, + /*out*/ std::set &max_set) +{ + std::multimap count_multimap = utils::flip_map(node_count_map); + + auto range = count_multimap.equal_range(count_multimap.begin()->first); + for (auto iter = range.first; iter != range.second; ++iter) { + min_set.insert(iter->second); + } + + range = count_multimap.equal_range(count_multimap.rbegin()->first); + for (auto iter = range.first; iter != range.second; ++iter) { + max_set.insert(iter->second); + } +} + greedy_load_balancer::greedy_load_balancer(meta_service *_svc) : simple_load_balancer(_svc), _ctrl_balancer_ignored_apps(nullptr), @@ -979,36 +1003,50 @@ void greedy_load_balancer::balance_cluster() } } - bool need_continue = cluster_replica_balance(t_global_view, cluster_balance_type::Secondary); + bool need_continue = cluster_replica_balance( + t_global_view, cluster_balance_type::COPY_SECONDARY, *t_migration_result); if (!need_continue) { return; } + + // TBD(zlw): copy primary } bool greedy_load_balancer::cluster_replica_balance(const meta_view *global_view, - const cluster_balance_type type) + const cluster_balance_type type, + /*out*/ migration_list &list) { - bool enough_information = do_cluster_replica_balance(global_view, type); + bool enough_information = do_cluster_replica_balance(global_view, type, list); if (!enough_information) { return false; } - if (!t_migration_result->empty()) { - ddebug_f( - "migration count of copy {} = {}", enum_to_string(type), t_migration_result->size()); + if (!list.empty()) { + ddebug_f("migration count of {} = {}", enum_to_string(type), list.size()); return false; } return true; } bool greedy_load_balancer::do_cluster_replica_balance(const meta_view *global_view, - const cluster_balance_type type) + const cluster_balance_type type, + /*out*/ migration_list &list) { cluster_migration_info cluster_info; if (!get_cluster_migration_info(global_view, type, cluster_info)) { return false; } - /// TBD(zlw) + partition_set selected_pid; + move_info next_move; + while (get_next_move(cluster_info, selected_pid, next_move)) { + if (!apply_move(next_move, selected_pid, list, cluster_info)) { + break; + } + if (list.size() >= FLAGS_balance_op_count_per_round) { + break; + } + } + return true; } @@ -1030,8 +1068,14 @@ bool greedy_load_balancer::get_cluster_migration_info(const meta_view *global_vi app_mapper apps; for (const auto &kv : all_apps) { const std::shared_ptr &app = kv.second; - if (is_ignored_app(app->app_id) || app->is_bulk_loading || app->splitting()) { - return false; + auto ignored = is_ignored_app(app->app_id); + if (ignored || app->is_bulk_loading || app->splitting()) { + ddebug_f("skip to balance app({}), ignored={}, bulk loading={}, splitting={}", + app->app_name, + ignored, + app->is_bulk_loading, + app->splitting()); + continue; } if (app->status == app_status::AS_AVAILABLE) { apps[app->app_id] = app; @@ -1054,7 +1098,7 @@ bool greedy_load_balancer::get_cluster_migration_info(const meta_view *global_vi get_node_migration_info(ns, apps, info); cluster_info.nodes_info.emplace(kv.first, std::move(info)); - auto count = get_count(ns, type, -1); + auto count = get_partition_count(ns, type, -1); cluster_info.replicas_count[kv.first] = count; } @@ -1085,7 +1129,7 @@ bool greedy_load_balancer::get_app_migration_info(std::shared_ptr app for (const auto &it : nodes) { const node_state &ns = it.second; - auto count = get_count(ns, type, app->app_id); + auto count = get_partition_count(ns, type, app->app_id); info.replicas_count[ns.addr()] = count; } @@ -1116,6 +1160,105 @@ void greedy_load_balancer::get_node_migration_info(const node_state &ns, } } +bool greedy_load_balancer::get_next_move(const cluster_migration_info &cluster_info, + const partition_set &selected_pid, + /*out*/ move_info &next_move) +{ + // key-app skew, value-app id + std::multimap app_skew_multimap = utils::flip_map(cluster_info.apps_skew); + auto max_app_skew = app_skew_multimap.rbegin()->first; + if (max_app_skew == 0) { + ddebug_f("every app is balanced and any move will unbalance a app"); + return false; + } + + auto server_skew = get_skew(cluster_info.replicas_count); + if (max_app_skew <= 1 && server_skew <= 1) { + ddebug_f("every app is balanced and the cluster as a whole is balanced"); + return false; + } + + /** + * Among the apps with maximum skew, attempt to pick a app where there is + * a move that improves the app skew and the cluster skew, if possible. If + * not, attempt to pick a move that improves the app skew. + **/ + std::set cluster_min_count_nodes; + std::set cluster_max_count_nodes; + get_min_max_set(cluster_info.replicas_count, cluster_min_count_nodes, cluster_max_count_nodes); + + bool found = false; + auto app_range = app_skew_multimap.equal_range(max_app_skew); + for (auto iter = app_range.first; iter != app_range.second; ++iter) { + auto app_id = iter->second; + auto it = cluster_info.apps_info.find(app_id); + if (it == cluster_info.apps_info.end()) { + continue; + } + auto app_map = it->second.replicas_count; + std::set app_min_count_nodes; + std::set app_max_count_nodes; + get_min_max_set(app_map, app_min_count_nodes, app_max_count_nodes); + + /** + * Compute the intersection of the replica servers most loaded for the app + * with the replica servers most loaded overall, and likewise for least loaded. + * These are our ideal candidates for moving from and to, respectively. + **/ + std::set app_cluster_min_set = + utils::get_intersection(app_min_count_nodes, cluster_min_count_nodes); + std::set app_cluster_max_set = + utils::get_intersection(app_max_count_nodes, cluster_max_count_nodes); + + /** + * Do not move replicas of a balanced app if the least (most) loaded + * servers overall do not intersect the servers hosting the least (most) + * replicas of the app. Moving a replica in that case might keep the + * cluster skew the same or make it worse while keeping the app balanced. + **/ + std::multimap app_count_multimap = utils::flip_map(app_map); + if (app_count_multimap.rbegin()->first <= app_count_multimap.begin()->first + 1 && + (app_cluster_min_set.empty() || app_cluster_max_set.empty())) { + ddebug_f( + "do not move replicas of a balanced app if the least (most) loaded servers overall " + "do not intersect the servers hosting the least (most) replicas of the app"); + continue; + } + + if (pick_up_move(cluster_info, + app_cluster_max_set.empty() ? app_max_count_nodes : app_cluster_max_set, + app_cluster_min_set.empty() ? app_min_count_nodes : app_cluster_min_set, + app_id, + selected_pid, + next_move)) { + found = true; + break; + } + } + + return found; +} + +bool greedy_load_balancer::pick_up_move(const cluster_migration_info &cluster_info, + const std::set &max_nodes, + const std::set &min_nodes, + const int32_t app_id, + const partition_set &selected_pid, + /*out*/ move_info &move_info) +{ + // TBD(zlw) + return false; +} + +bool greedy_load_balancer::apply_move(const move_info &move, + /*out*/ partition_set &selected_pids, + /*out*/ migration_list &list, + /*out*/ cluster_migration_info &cluster_info) +{ + // TBD(zlw) + return false; +} + bool greedy_load_balancer::balance(meta_view view, migration_list &list) { ddebug("balancer round"); diff --git a/src/meta/greedy_load_balancer.h b/src/meta/greedy_load_balancer.h index 5c3172fae6..2fd2d6cd7d 100644 --- a/src/meta/greedy_load_balancer.h +++ b/src/meta/greedy_load_balancer.h @@ -41,17 +41,20 @@ namespace dsn { namespace replication { enum class cluster_balance_type { - Primary = 0, - Secondary, - Invalid, + COPY_PRIMARY = 0, + COPY_SECONDARY, + INVALID, }; -ENUM_BEGIN(cluster_balance_type, cluster_balance_type::Invalid) -ENUM_REG(cluster_balance_type::Primary) -ENUM_REG(cluster_balance_type::Secondary) +ENUM_BEGIN(cluster_balance_type, cluster_balance_type::INVALID) +ENUM_REG(cluster_balance_type::COPY_PRIMARY) +ENUM_REG(cluster_balance_type::COPY_SECONDARY) ENUM_END(cluster_balance_type) -uint32_t get_count(const node_state &ns, cluster_balance_type type, int32_t app_id); +uint32_t get_partition_count(const node_state &ns, cluster_balance_type type, int32_t app_id); uint32_t get_skew(const std::map &count_map); +void get_min_max_set(const std::map &node_count_map, + /*out*/ std::set &min_set, + /*out*/ std::set &max_set); class greedy_load_balancer : public simple_load_balancer { @@ -153,9 +156,13 @@ class greedy_load_balancer : public simple_load_balancer void balance_cluster(); - bool cluster_replica_balance(const meta_view *global_view, const cluster_balance_type type); + bool cluster_replica_balance(const meta_view *global_view, + const cluster_balance_type type, + /*out*/ migration_list &list); - bool do_cluster_replica_balance(const meta_view *global_view, const cluster_balance_type type); + bool do_cluster_replica_balance(const meta_view *global_view, + const cluster_balance_type type, + /*out*/ migration_list &list); struct app_migration_info { @@ -187,13 +194,12 @@ class greedy_load_balancer : public simple_load_balancer struct node_migration_info { rpc_address address; + // key-disk tag, value-partition set std::map partitions; partition_set future_partitions; bool operator<(const node_migration_info &another) const { - if (address < another.address) - return true; - return false; + return address < another.address; } bool operator==(const node_migration_info &another) const { @@ -210,6 +216,15 @@ class greedy_load_balancer : public simple_load_balancer std::map replicas_count; }; + struct move_info + { + gpid pid; + rpc_address source_node; + std::string source_disk_tag; + rpc_address target_node; + balance_type type; + }; + bool get_cluster_migration_info(const meta_view *global_view, const cluster_balance_type type, /*out*/ cluster_migration_info &cluster_info); @@ -223,6 +238,22 @@ class greedy_load_balancer : public simple_load_balancer const app_mapper &all_apps, /*out*/ node_migration_info &info); + bool get_next_move(const cluster_migration_info &cluster_info, + const partition_set &selected_pid, + /*out*/ move_info &next_move); + + bool pick_up_move(const cluster_migration_info &cluster_info, + const std::set &max_nodes, + const std::set &min_nodes, + const int32_t app_id, + const partition_set &selected_pid, + /*out*/ move_info &move_info); + + bool apply_move(const move_info &move, + /*out*/ partition_set &selected_pids, + /*out*/ migration_list &list, + /*out*/ cluster_migration_info &cluster_info); + bool all_replica_infos_collected(const node_state &ns); // using t_global_view to get disk_tag of node's pid const std::string &get_disk_tag(const dsn::rpc_address &node, const dsn::gpid &pid); @@ -251,7 +282,7 @@ class greedy_load_balancer : public simple_load_balancer FRIEND_TEST(greedy_load_balancer, app_migration_info); FRIEND_TEST(greedy_load_balancer, node_migration_info); FRIEND_TEST(greedy_load_balancer, get_skew); - FRIEND_TEST(greedy_load_balancer, get_count); + FRIEND_TEST(greedy_load_balancer, get_partition_count); FRIEND_TEST(greedy_load_balancer, get_app_migration_info); FRIEND_TEST(greedy_load_balancer, get_node_migration_info); }; diff --git a/src/meta/test/greedy_load_balancer_test.cpp b/src/meta/test/greedy_load_balancer_test.cpp index f345324e95..67b374397d 100644 --- a/src/meta/test/greedy_load_balancer_test.cpp +++ b/src/meta/test/greedy_load_balancer_test.cpp @@ -77,7 +77,7 @@ TEST(greedy_load_balancer, get_skew) ASSERT_EQ(get_skew(count_map), count_map.rbegin()->second - count_map.begin()->second); } -TEST(greedy_load_balancer, get_count) +TEST(greedy_load_balancer, get_partition_count) { node_state ns; int apid = 1; @@ -86,8 +86,8 @@ TEST(greedy_load_balancer, get_count) ns.put_partition(gpid(apid, 2), false); ns.put_partition(gpid(apid, 3), false); - ASSERT_EQ(get_count(ns, cluster_balance_type::Primary, apid), 1); - ASSERT_EQ(get_count(ns, cluster_balance_type::Secondary, apid), 3); + ASSERT_EQ(get_partition_count(ns, cluster_balance_type::COPY_PRIMARY, apid), 1); + ASSERT_EQ(get_partition_count(ns, cluster_balance_type::COPY_SECONDARY, apid), 3); } TEST(greedy_load_balancer, get_app_migration_info) @@ -114,14 +114,14 @@ TEST(greedy_load_balancer, get_app_migration_info) { app->partitions[0].max_replica_count = 100; auto res = balancer.get_app_migration_info( - app, nodes, cluster_balance_type::Primary, migration_info); + app, nodes, cluster_balance_type::COPY_PRIMARY, migration_info); ASSERT_FALSE(res); } { app->partitions[0].max_replica_count = 1; auto res = balancer.get_app_migration_info( - app, nodes, cluster_balance_type::Primary, migration_info); + app, nodes, cluster_balance_type::COPY_PRIMARY, migration_info); ASSERT_TRUE(res); ASSERT_EQ(migration_info.app_id, appid); ASSERT_EQ(migration_info.app_name, appname); @@ -172,5 +172,24 @@ TEST(greedy_load_balancer, get_node_migration_info) ASSERT_EQ(migration_info.partitions.at(disk_tag).size(), 1); ASSERT_EQ(*migration_info.partitions.at(disk_tag).begin(), pid); } + +TEST(greedy_load_balancer, get_min_max_set) +{ + + std::map node_count_map; + node_count_map.emplace(rpc_address(1, 10086), 1); + node_count_map.emplace(rpc_address(2, 10086), 3); + node_count_map.emplace(rpc_address(3, 10086), 5); + node_count_map.emplace(rpc_address(4, 10086), 5); + + std::set min_set, max_set; + get_min_max_set(node_count_map, min_set, max_set); + + ASSERT_EQ(min_set.size(), 1); + ASSERT_EQ(*min_set.begin(), rpc_address(1, 10086)); + ASSERT_EQ(max_set.size(), 2); + ASSERT_EQ(*max_set.begin(), rpc_address(3, 10086)); + ASSERT_EQ(*max_set.rbegin(), rpc_address(4, 10086)); +} } // namespace replication } // namespace dsn diff --git a/src/utils/test/utils.cpp b/src/utils/test/utils.cpp index df4f592512..7cf009aa73 100644 --- a/src/utils/test/utils.cpp +++ b/src/utils/test/utils.cpp @@ -281,3 +281,39 @@ TEST(core, ref_ptr) z = foo_ptr(); EXPECT_TRUE(count == 0); } + +TEST(core, flip_map) +{ + std::map source; + source.emplace(3, 1); + source.emplace(2, 1); + source.emplace(1, 1); + + auto target = flip_map(source); + ASSERT_EQ(target.size(), 3); + ASSERT_EQ(target.count(1), 3); + ASSERT_EQ(target.count(2), 0); + ASSERT_EQ(target.count(3), 0); + std::string values; + for (auto it = target.equal_range(1); it.first != it.second; it.first++) { + values += std::to_string(it.first->second); + } + ASSERT_EQ(values, "123"); +} + +TEST(core, get_intersection) +{ + std::set set1; + set1.insert(1); + set1.insert(2); + set1.insert(3); + + std::set set2; + set2.insert(3); + set2.insert(4); + set2.insert(5); + + auto intersection = utils::get_intersection(set1, set2); + ASSERT_EQ(intersection.size(), 1); + ASSERT_EQ(*intersection.begin(), 3); +}