Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat: implement greedy_load_balancer::apply_move #882

Merged
merged 8 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 67 additions & 2 deletions src/meta/greedy_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <dsn/utility/math.h>
#include <dsn/utility/utils.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/fail_point.h>
#include "greedy_load_balancer.h"
#include "meta_data.h"
#include "meta_admin_types.h"
Expand Down Expand Up @@ -278,6 +279,8 @@ greedy_load_balancer::generate_balancer_request(const partition_configuration &p
const rpc_address &from,
const rpc_address &to)
{
FAIL_POINT_INJECT_F("generate_balancer_request", [](string_view name) { return nullptr; });

configuration_balancer_request result;
result.gpid = pc.pid;

Expand Down Expand Up @@ -1369,8 +1372,70 @@ bool greedy_load_balancer::apply_move(const move_info &move,
/*out*/ migration_list &list,
/*out*/ cluster_migration_info &cluster_info)
{
// TBD(zlw)
return false;
int32_t app_id = move.pid.get_app_id();
rpc_address source = move.source_node, target = move.target_node;
if (cluster_info.apps_skew.find(app_id) == cluster_info.apps_skew.end() ||
cluster_info.replicas_count.find(source) == cluster_info.replicas_count.end() ||
cluster_info.replicas_count.find(target) == cluster_info.replicas_count.end() ||
cluster_info.apps_info.find(app_id) == cluster_info.apps_info.end()) {
return false;
}

app_migration_info app_info = cluster_info.apps_info[app_id];
if (app_info.partitions.size() <= move.pid.get_partition_index() ||
app_info.replicas_count.find(source) == app_info.replicas_count.end() ||
app_info.replicas_count.find(target) == app_info.replicas_count.end()) {
return false;
}
app_info.replicas_count[source]--;
app_info.replicas_count[target]++;

auto &pmap = app_info.partitions[move.pid.get_partition_index()];
rpc_address primary_addr;
for (const auto &kv : pmap) {
if (kv.second == partition_status::PS_PRIMARY) {
primary_addr = kv.first;
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
}
}
auto status = cluster_info.type == cluster_balance_type::COPY_SECONDARY
? partition_status::PS_SECONDARY
: partition_status::PS_PRIMARY;
auto iter = pmap.find(source);
if (iter == pmap.end() || iter->second != status) {
return false;
}
pmap.erase(source);
pmap[target] = status;

auto iters = cluster_info.nodes_info.find(source);
auto itert = cluster_info.nodes_info.find(target);
if (iters == cluster_info.nodes_info.end() || itert == cluster_info.nodes_info.end()) {
return false;
}
node_migration_info node_source = iters->second;
node_migration_info node_target = itert->second;
auto it = node_source.partitions.find(move.source_disk_tag);
if (it == node_source.partitions.end()) {
return false;
}
it->second.erase(move.pid);
node_target.future_partitions.insert(move.pid);

// add into migration list and selected_pid
partition_configuration pc;
pc.pid = move.pid;
pc.primary = primary_addr;
list[move.pid] = generate_balancer_request(pc, move.type, source, target);
t_migration_result->emplace(move.pid, generate_balancer_request(pc, move.type, source, target));
selected_pids.insert(move.pid);

cluster_info.apps_skew[app_id] = get_skew(app_info.replicas_count);
cluster_info.apps_info[app_id] = app_info;
cluster_info.nodes_info[source] = node_source;
cluster_info.nodes_info[target] = node_target;
cluster_info.replicas_count[source]--;
cluster_info.replicas_count[target]++;
return true;
}

bool greedy_load_balancer::balance(meta_view view, migration_list &list)
Expand Down
1 change: 1 addition & 0 deletions src/meta/greedy_load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ class greedy_load_balancer : public simple_load_balancer
FRIEND_TEST(greedy_load_balancer, get_node_migration_info);
FRIEND_TEST(greedy_load_balancer, get_disk_partitions_map);
FRIEND_TEST(greedy_load_balancer, get_max_load_disk);
FRIEND_TEST(greedy_load_balancer, apply_move);
FRIEND_TEST(greedy_load_balancer, pick_up_partition);
};

Expand Down
87 changes: 87 additions & 0 deletions src/meta/test/greedy_load_balancer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <gtest/gtest.h>
#include <dsn/utility/defer.h>
#include <dsn/utility/fail_point.h>
#include "meta/greedy_load_balancer.h"

namespace dsn {
Expand Down Expand Up @@ -260,6 +261,92 @@ TEST(greedy_load_balancer, get_max_load_disk)
ASSERT_EQ(target_partitions.count(pid), 1);
}

TEST(greedy_load_balancer, apply_move)
{
struct greedy_load_balancer::move_info minfo;
int32_t app_id = 1;
int32_t partition_index = 1;
minfo.pid = gpid(app_id, partition_index);
rpc_address source_node(1, 10086);
minfo.source_node = source_node;
std::string disk_tag = "disk1";
minfo.source_disk_tag = disk_tag;
rpc_address target_node(2, 10086);
minfo.target_node = target_node;
minfo.type = greedy_load_balancer::balance_type::move_primary;

greedy_load_balancer balancer(nullptr);
greedy_load_balancer::cluster_migration_info cluster_info;
cluster_info.type = cluster_balance_type::COPY_SECONDARY;
partition_set selected_pids;
migration_list list;
balancer.t_migration_result = &list;

// target_node is not found in cluster_info.replicas_count
auto res = balancer.apply_move(minfo, selected_pids, list, cluster_info);
ASSERT_FALSE(res);

// source_node is not found in cluster_info.replicas_count
cluster_info.apps_skew[app_id] = 1;
res = balancer.apply_move(minfo, selected_pids, list, cluster_info);
ASSERT_FALSE(res);

// target_node is not found in cluster_info.replicas_count
cluster_info.replicas_count[source_node] = 1;
res = balancer.apply_move(minfo, selected_pids, list, cluster_info);
ASSERT_FALSE(res);

// app_id is not found in cluster_info.app_skew
cluster_info.replicas_count[target_node] = 1;
res = balancer.apply_move(minfo, selected_pids, list, cluster_info);
ASSERT_FALSE(res);

// source_node and target_node are not found in app_info
greedy_load_balancer::app_migration_info app_info;
cluster_info.apps_info[app_id] = app_info;
res = balancer.apply_move(minfo, selected_pids, list, cluster_info);
ASSERT_FALSE(res);

// app_info.partitions.size() < partition_index
app_info.replicas_count[target_node] = 1;
app_info.replicas_count[source_node] = 1;
cluster_info.apps_info[app_id] = app_info;
res = balancer.apply_move(minfo, selected_pids, list, cluster_info);
ASSERT_FALSE(res);

// all of the partition status are not PS_SECONDARY
std::map<rpc_address, partition_status::type> partition_status;
partition_status[source_node] = partition_status::type::PS_PRIMARY;
cluster_info.apps_info[app_id].partitions.push_back(partition_status);
cluster_info.apps_info[app_id].partitions.push_back(partition_status);
res = balancer.apply_move(minfo, selected_pids, list, cluster_info);
ASSERT_FALSE(res);

// target_node and source_node are not found in cluster_info.nodes_info
partition_status[source_node] = partition_status::type::PS_SECONDARY;
cluster_info.apps_info[app_id].partitions.clear();
cluster_info.apps_info[app_id].partitions.push_back(partition_status);
cluster_info.apps_info[app_id].partitions.push_back(partition_status);
res = balancer.apply_move(minfo, selected_pids, list, cluster_info);
ASSERT_FALSE(res);

// disk_tag is not found in node_info
greedy_load_balancer::node_migration_info target_info;
greedy_load_balancer::node_migration_info source_info;
cluster_info.nodes_info[target_node] = target_info;
cluster_info.nodes_info[source_node] = source_info;
res = balancer.apply_move(minfo, selected_pids, list, cluster_info);
ASSERT_FALSE(res);

fail::setup();
fail::cfg("generate_balancer_request", "return()");
partition_set source_partition_set;
cluster_info.nodes_info[source_node].partitions[disk_tag] = source_partition_set;
res = balancer.apply_move(minfo, selected_pids, list, cluster_info);
fail::teardown();
ASSERT_TRUE(res);
}

TEST(greedy_load_balancer, pick_up_partition)
{
greedy_load_balancer::cluster_migration_info cluster_info;
Expand Down