Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat: add implementation of get next move (#872)
Browse files Browse the repository at this point in the history
  • Loading branch information
levy5307 authored Aug 6, 2021
1 parent 4bddefe commit e89df8d
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 33 deletions.
23 changes: 23 additions & 0 deletions include/dsn/utility/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,28 @@ bool hostname(const dsn::rpc_address &address, std::string *hostname_result);
// valid_ip_network_order -> return TRUE && hostname_result=hostname |
// invalid_ip_network_order -> return FALSE
bool hostname_from_ip(uint32_t ip, std::string *hostname_result);

template <typename A, typename B>
std::multimap<B, A> flip_map(const std::map<A, B> &source)
{
std::multimap<B, A> target;
std::transform(source.begin(),
source.end(),
std::inserter(target, target.begin()),
[](const std::pair<A, B> &p) { return std::pair<B, A>(p.second, p.first); });
return target;
}

template <typename T>
std::set<T> get_intersection(const std::set<T> &set1, const std::set<T> &set2)
{
std::set<T> intersection;
std::set_intersection(set1.begin(),
set1.end(),
set2.begin(),
set2.end(),
std::inserter(intersection, intersection.begin()));
return intersection;
}
} // namespace utils
} // namespace dsn
173 changes: 158 additions & 15 deletions src/meta/greedy_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <queue>
#include <dsn/tool-api/command_manager.h>
#include <dsn/utility/math.h>
#include <dsn/utility/utils.h>
#include <dsn/dist/fmt_logging.h>
#include "greedy_load_balancer.h"
#include "meta_data.h"
Expand All @@ -39,18 +40,24 @@ namespace replication {
DSN_DEFINE_bool("meta_server", balance_cluster, false, "whether to enable cluster balancer");
DSN_TAG_VARIABLE(balance_cluster, FT_MUTABLE);

uint32_t get_count(const node_state &ns, cluster_balance_type type, int32_t app_id)
DSN_DEFINE_uint32("meta_server",
balance_op_count_per_round,
10,
"balance operation count per round for cluster balancer");
DSN_TAG_VARIABLE(balance_op_count_per_round, FT_MUTABLE);

uint32_t get_partition_count(const node_state &ns, cluster_balance_type type, int32_t app_id)
{
unsigned count = 0;
switch (type) {
case cluster_balance_type::Secondary:
case cluster_balance_type::COPY_SECONDARY:
if (app_id > 0) {
count = ns.partition_count(app_id) - ns.primary_count(app_id);
} else {
count = ns.partition_count() - ns.primary_count();
}
break;
case cluster_balance_type::Primary:
case cluster_balance_type::COPY_PRIMARY:
if (app_id > 0) {
count = ns.primary_count(app_id);
} else {
Expand All @@ -77,6 +84,23 @@ uint32_t get_skew(const std::map<rpc_address, uint32_t> &count_map)
return max - min;
}

void get_min_max_set(const std::map<rpc_address, uint32_t> &node_count_map,
/*out*/ std::set<rpc_address> &min_set,
/*out*/ std::set<rpc_address> &max_set)
{
std::multimap<uint32_t, rpc_address> count_multimap = utils::flip_map(node_count_map);

auto range = count_multimap.equal_range(count_multimap.begin()->first);
for (auto iter = range.first; iter != range.second; ++iter) {
min_set.insert(iter->second);
}

range = count_multimap.equal_range(count_multimap.rbegin()->first);
for (auto iter = range.first; iter != range.second; ++iter) {
max_set.insert(iter->second);
}
}

greedy_load_balancer::greedy_load_balancer(meta_service *_svc)
: simple_load_balancer(_svc),
_ctrl_balancer_ignored_apps(nullptr),
Expand Down Expand Up @@ -979,36 +1003,50 @@ void greedy_load_balancer::balance_cluster()
}
}

bool need_continue = cluster_replica_balance(t_global_view, cluster_balance_type::Secondary);
bool need_continue = cluster_replica_balance(
t_global_view, cluster_balance_type::COPY_SECONDARY, *t_migration_result);
if (!need_continue) {
return;
}

// TBD(zlw): copy primary
}

bool greedy_load_balancer::cluster_replica_balance(const meta_view *global_view,
const cluster_balance_type type)
const cluster_balance_type type,
/*out*/ migration_list &list)
{
bool enough_information = do_cluster_replica_balance(global_view, type);
bool enough_information = do_cluster_replica_balance(global_view, type, list);
if (!enough_information) {
return false;
}
if (!t_migration_result->empty()) {
ddebug_f(
"migration count of copy {} = {}", enum_to_string(type), t_migration_result->size());
if (!list.empty()) {
ddebug_f("migration count of {} = {}", enum_to_string(type), list.size());
return false;
}
return true;
}

bool greedy_load_balancer::do_cluster_replica_balance(const meta_view *global_view,
const cluster_balance_type type)
const cluster_balance_type type,
/*out*/ migration_list &list)
{
cluster_migration_info cluster_info;
if (!get_cluster_migration_info(global_view, type, cluster_info)) {
return false;
}

/// TBD(zlw)
partition_set selected_pid;
move_info next_move;
while (get_next_move(cluster_info, selected_pid, next_move)) {
if (!apply_move(next_move, selected_pid, list, cluster_info)) {
break;
}
if (list.size() >= FLAGS_balance_op_count_per_round) {
break;
}
}

return true;
}

Expand All @@ -1030,8 +1068,14 @@ bool greedy_load_balancer::get_cluster_migration_info(const meta_view *global_vi
app_mapper apps;
for (const auto &kv : all_apps) {
const std::shared_ptr<app_state> &app = kv.second;
if (is_ignored_app(app->app_id) || app->is_bulk_loading || app->splitting()) {
return false;
auto ignored = is_ignored_app(app->app_id);
if (ignored || app->is_bulk_loading || app->splitting()) {
ddebug_f("skip to balance app({}), ignored={}, bulk loading={}, splitting={}",
app->app_name,
ignored,
app->is_bulk_loading,
app->splitting());
continue;
}
if (app->status == app_status::AS_AVAILABLE) {
apps[app->app_id] = app;
Expand All @@ -1054,7 +1098,7 @@ bool greedy_load_balancer::get_cluster_migration_info(const meta_view *global_vi
get_node_migration_info(ns, apps, info);
cluster_info.nodes_info.emplace(kv.first, std::move(info));

auto count = get_count(ns, type, -1);
auto count = get_partition_count(ns, type, -1);
cluster_info.replicas_count[kv.first] = count;
}

Expand Down Expand Up @@ -1085,7 +1129,7 @@ bool greedy_load_balancer::get_app_migration_info(std::shared_ptr<app_state> app

for (const auto &it : nodes) {
const node_state &ns = it.second;
auto count = get_count(ns, type, app->app_id);
auto count = get_partition_count(ns, type, app->app_id);
info.replicas_count[ns.addr()] = count;
}

Expand Down Expand Up @@ -1116,6 +1160,105 @@ void greedy_load_balancer::get_node_migration_info(const node_state &ns,
}
}

bool greedy_load_balancer::get_next_move(const cluster_migration_info &cluster_info,
const partition_set &selected_pid,
/*out*/ move_info &next_move)
{
// key-app skew, value-app id
std::multimap<uint32_t, int32_t> app_skew_multimap = utils::flip_map(cluster_info.apps_skew);
auto max_app_skew = app_skew_multimap.rbegin()->first;
if (max_app_skew == 0) {
ddebug_f("every app is balanced and any move will unbalance a app");
return false;
}

auto server_skew = get_skew(cluster_info.replicas_count);
if (max_app_skew <= 1 && server_skew <= 1) {
ddebug_f("every app is balanced and the cluster as a whole is balanced");
return false;
}

/**
* Among the apps with maximum skew, attempt to pick a app where there is
* a move that improves the app skew and the cluster skew, if possible. If
* not, attempt to pick a move that improves the app skew.
**/
std::set<rpc_address> cluster_min_count_nodes;
std::set<rpc_address> cluster_max_count_nodes;
get_min_max_set(cluster_info.replicas_count, cluster_min_count_nodes, cluster_max_count_nodes);

bool found = false;
auto app_range = app_skew_multimap.equal_range(max_app_skew);
for (auto iter = app_range.first; iter != app_range.second; ++iter) {
auto app_id = iter->second;
auto it = cluster_info.apps_info.find(app_id);
if (it == cluster_info.apps_info.end()) {
continue;
}
auto app_map = it->second.replicas_count;
std::set<rpc_address> app_min_count_nodes;
std::set<rpc_address> app_max_count_nodes;
get_min_max_set(app_map, app_min_count_nodes, app_max_count_nodes);

/**
* Compute the intersection of the replica servers most loaded for the app
* with the replica servers most loaded overall, and likewise for least loaded.
* These are our ideal candidates for moving from and to, respectively.
**/
std::set<rpc_address> app_cluster_min_set =
utils::get_intersection(app_min_count_nodes, cluster_min_count_nodes);
std::set<rpc_address> app_cluster_max_set =
utils::get_intersection(app_max_count_nodes, cluster_max_count_nodes);

/**
* Do not move replicas of a balanced app if the least (most) loaded
* servers overall do not intersect the servers hosting the least (most)
* replicas of the app. Moving a replica in that case might keep the
* cluster skew the same or make it worse while keeping the app balanced.
**/
std::multimap<uint32_t, rpc_address> app_count_multimap = utils::flip_map(app_map);
if (app_count_multimap.rbegin()->first <= app_count_multimap.begin()->first + 1 &&
(app_cluster_min_set.empty() || app_cluster_max_set.empty())) {
ddebug_f(
"do not move replicas of a balanced app if the least (most) loaded servers overall "
"do not intersect the servers hosting the least (most) replicas of the app");
continue;
}

if (pick_up_move(cluster_info,
app_cluster_max_set.empty() ? app_max_count_nodes : app_cluster_max_set,
app_cluster_min_set.empty() ? app_min_count_nodes : app_cluster_min_set,
app_id,
selected_pid,
next_move)) {
found = true;
break;
}
}

return found;
}

bool greedy_load_balancer::pick_up_move(const cluster_migration_info &cluster_info,
const std::set<rpc_address> &max_nodes,
const std::set<rpc_address> &min_nodes,
const int32_t app_id,
const partition_set &selected_pid,
/*out*/ move_info &move_info)
{
// TBD(zlw)
return false;
}

bool greedy_load_balancer::apply_move(const move_info &move,
/*out*/ partition_set &selected_pids,
/*out*/ migration_list &list,
/*out*/ cluster_migration_info &cluster_info)
{
// TBD(zlw)
return false;
}

bool greedy_load_balancer::balance(meta_view view, migration_list &list)
{
ddebug("balancer round");
Expand Down
57 changes: 44 additions & 13 deletions src/meta/greedy_load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,20 @@ namespace dsn {
namespace replication {
enum class cluster_balance_type
{
Primary = 0,
Secondary,
Invalid,
COPY_PRIMARY = 0,
COPY_SECONDARY,
INVALID,
};
ENUM_BEGIN(cluster_balance_type, cluster_balance_type::Invalid)
ENUM_REG(cluster_balance_type::Primary)
ENUM_REG(cluster_balance_type::Secondary)
ENUM_BEGIN(cluster_balance_type, cluster_balance_type::INVALID)
ENUM_REG(cluster_balance_type::COPY_PRIMARY)
ENUM_REG(cluster_balance_type::COPY_SECONDARY)
ENUM_END(cluster_balance_type)

uint32_t get_count(const node_state &ns, cluster_balance_type type, int32_t app_id);
uint32_t get_partition_count(const node_state &ns, cluster_balance_type type, int32_t app_id);
uint32_t get_skew(const std::map<rpc_address, uint32_t> &count_map);
void get_min_max_set(const std::map<rpc_address, uint32_t> &node_count_map,
/*out*/ std::set<rpc_address> &min_set,
/*out*/ std::set<rpc_address> &max_set);

class greedy_load_balancer : public simple_load_balancer
{
Expand Down Expand Up @@ -153,9 +156,13 @@ class greedy_load_balancer : public simple_load_balancer

void balance_cluster();

bool cluster_replica_balance(const meta_view *global_view, const cluster_balance_type type);
bool cluster_replica_balance(const meta_view *global_view,
const cluster_balance_type type,
/*out*/ migration_list &list);

bool do_cluster_replica_balance(const meta_view *global_view, const cluster_balance_type type);
bool do_cluster_replica_balance(const meta_view *global_view,
const cluster_balance_type type,
/*out*/ migration_list &list);

struct app_migration_info
{
Expand Down Expand Up @@ -187,13 +194,12 @@ class greedy_load_balancer : public simple_load_balancer
struct node_migration_info
{
rpc_address address;
// key-disk tag, value-partition set
std::map<std::string, partition_set> partitions;
partition_set future_partitions;
bool operator<(const node_migration_info &another) const
{
if (address < another.address)
return true;
return false;
return address < another.address;
}
bool operator==(const node_migration_info &another) const
{
Expand All @@ -210,6 +216,15 @@ class greedy_load_balancer : public simple_load_balancer
std::map<rpc_address, uint32_t> replicas_count;
};

struct move_info
{
gpid pid;
rpc_address source_node;
std::string source_disk_tag;
rpc_address target_node;
balance_type type;
};

bool get_cluster_migration_info(const meta_view *global_view,
const cluster_balance_type type,
/*out*/ cluster_migration_info &cluster_info);
Expand All @@ -223,6 +238,22 @@ class greedy_load_balancer : public simple_load_balancer
const app_mapper &all_apps,
/*out*/ node_migration_info &info);

bool get_next_move(const cluster_migration_info &cluster_info,
const partition_set &selected_pid,
/*out*/ move_info &next_move);

bool pick_up_move(const cluster_migration_info &cluster_info,
const std::set<rpc_address> &max_nodes,
const std::set<rpc_address> &min_nodes,
const int32_t app_id,
const partition_set &selected_pid,
/*out*/ move_info &move_info);

bool apply_move(const move_info &move,
/*out*/ partition_set &selected_pids,
/*out*/ migration_list &list,
/*out*/ cluster_migration_info &cluster_info);

bool all_replica_infos_collected(const node_state &ns);
// using t_global_view to get disk_tag of node's pid
const std::string &get_disk_tag(const dsn::rpc_address &node, const dsn::gpid &pid);
Expand Down Expand Up @@ -251,7 +282,7 @@ class greedy_load_balancer : public simple_load_balancer
FRIEND_TEST(greedy_load_balancer, app_migration_info);
FRIEND_TEST(greedy_load_balancer, node_migration_info);
FRIEND_TEST(greedy_load_balancer, get_skew);
FRIEND_TEST(greedy_load_balancer, get_count);
FRIEND_TEST(greedy_load_balancer, get_partition_count);
FRIEND_TEST(greedy_load_balancer, get_app_migration_info);
FRIEND_TEST(greedy_load_balancer, get_node_migration_info);
};
Expand Down
Loading

0 comments on commit e89df8d

Please sign in to comment.