Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(split): meta failover during partition split (#764)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored Mar 17, 2021
1 parent a6b1c6c commit 2ebdfce
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/meta/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ class meta_service : public serverlet<meta_service>
friend class meta_load_balance_test;
friend class meta_service_test;
friend class meta_service_test_app;
friend class meta_split_service_test;
friend class meta_test_base;
friend class policy_context_test;
friend class test::test_checker;
Expand Down
27 changes: 23 additions & 4 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,9 +587,27 @@ dsn::error_code server_state::sync_apps_from_remote_storage()
}
}
} else if (ec == ERR_OBJECT_NOT_FOUND) {
dwarn("partition node %s not exist on remote storage, may half create before",
partition_path.c_str());
init_app_partition_node(app, partition_id, nullptr);
auto init_partition_count = app->init_partition_count > 0
? app->init_partition_count
: app->partition_count;
if (partition_id < init_partition_count) {
dwarn_f(
"partition node {} not exist on remote storage, may half create before",
partition_path);
init_app_partition_node(app, partition_id, nullptr);
} else if (partition_id >= app->partition_count / 2) {
dwarn_f(
"partition node {} not exist on remote storage, may half split before",
partition_path);
zauto_write_lock l(_lock);
app->helpers->split_states.status[partition_id - app->partition_count / 2] =
split_status::SPLITTING;
app->helpers->split_states.splitting_count++;
app->partitions[partition_id].ballot = invalid_ballot;
app->partitions[partition_id].pid = gpid(app->app_id, partition_id);
process_one_partition(app);
}

} else {
derror("get partition node failed, reason(%s)", ec.to_string());
err = ec;
Expand Down Expand Up @@ -623,7 +641,7 @@ dsn::error_code server_state::sync_apps_from_remote_storage()
app->get_logname());
}
}

app->helpers->split_states.splitting_count = 0;
for (int i = 0; i < app->partition_count; i++) {
std::string partition_path =
app_path + "/" + boost::lexical_cast<std::string>(i);
Expand Down Expand Up @@ -1094,6 +1112,7 @@ void server_state::create_app(dsn::message_ex *msg)
info.partition_count = request.options.partition_count;
info.status = app_status::AS_CREATING;
info.create_second = dsn_now_ms() / 1000;
info.init_partition_count = request.options.partition_count;

app = app_state::create(info);
app->helpers->pending_response = msg;
Expand Down
1 change: 1 addition & 0 deletions src/meta/server_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ class server_state
friend class meta_duplication_service_test;
friend class meta_load_balance_test;
friend class meta_split_service;
friend class meta_split_service_test;
friend class meta_service_test_app;
friend class meta_test_base;
friend class test::test_checker;
Expand Down
104 changes: 104 additions & 0 deletions src/meta/test/meta_split_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@

#include <gtest/gtest.h>
#include <dsn/service_api_c.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replica_envs.h>

#include "meta_service_test_app.h"
#include "meta_test_base.h"
#include "meta/meta_split_service.h"
#include "meta/meta_server_failure_detector.h"

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -252,6 +254,90 @@ class meta_split_service_test : public meta_test_base
}
}

void initialize_meta_server_with_mock_app()
{
// initialize meta service
auto meta_svc = new fake_receiver_meta_service();
meta_svc->remote_storage_initialize();

// initialize server_state
auto state = meta_svc->_state;
state->initialize(meta_svc, meta_svc->_cluster_root + "/apps");
meta_svc->_started = true;
_ms.reset(meta_svc);

// initialize bulk load service
_ms->_split_svc = make_unique<meta_split_service>(_ms.get());

// mock splitting app
create_splitting_app_on_remote_stroage(state->_apps_root);
state->initialize_data_structure();

_ms->_failure_detector.reset(new meta_server_failure_detector(_ms.get()));
_ss = _ms->_state;
}

void create_splitting_app_on_remote_stroage(const std::string &app_root)
{
static const char *lock_state = "lock";
static const char *unlock_state = "unlock";
std::string path = app_root;

_ms->get_meta_storage()->create_node(
std::move(path), blob(lock_state, 0, strlen(lock_state)), [this, &app_root]() {
ddebug_f("create app root {}", app_root);
});
wait_all();

// create splitting app
app_info ainfo;
ainfo.app_id = 1;
ainfo.app_name = NAME;
ainfo.app_type = "pegasus";
ainfo.is_stateful = true;
ainfo.max_replica_count = 3;
ainfo.partition_count = NEW_PARTITION_COUNT;
ainfo.init_partition_count = PARTITION_COUNT;
ainfo.status = app_status::AS_AVAILABLE;

blob value = json::json_forwarder<app_info>::encode(ainfo);
_ms->get_meta_storage()->create_node(
app_root + "/" + boost::lexical_cast<std::string>(ainfo.app_id),
std::move(value),
[this, &app_root, &ainfo]() {
ddebug_f("create app({}) app_id={}, dir succeed", ainfo.app_name, ainfo.app_id);
for (int i = 0; i < ainfo.init_partition_count; ++i) {
create_partition_configuration_on_remote_storage(app_root, ainfo.app_id, i);
}
create_partition_configuration_on_remote_storage(
app_root, ainfo.app_id, CHILD_INDEX);
});
wait_all();

std::string root = app_root;
_ms->get_meta_storage()->set_data(
std::move(root), blob(unlock_state, 0, strlen(unlock_state)), []() {});
wait_all();
}

void create_partition_configuration_on_remote_storage(const std::string &app_root,
const int32_t app_id,
const int32_t pidx)
{
partition_configuration config;
config.max_replica_count = 3;
config.pid = gpid(app_id, pidx);
config.ballot = PARENT_BALLOT;
blob value = json::json_forwarder<partition_configuration>::encode(config);
_ms->get_meta_storage()->create_node(
app_root + "/" + boost::lexical_cast<std::string>(app_id) + "/" +
boost::lexical_cast<std::string>(pidx),
std::move(value),
[app_id, pidx, this]() {
ddebug_f("create app({}), partition({}.{}) dir succeed", NAME, app_id, pidx);
});
}

const std::string NAME = "split_table";
const int32_t PARTITION_COUNT = 4;
const int32_t NEW_PARTITION_COUNT = 8;
Expand Down Expand Up @@ -711,5 +797,23 @@ TEST_F(meta_split_service_test, query_child_state_test)
}
}

class meta_split_service_failover_test : public meta_split_service_test
{
public:
void SetUp() {}
void TearDown() { meta_test_base::TearDown(); }
};

TEST_F(meta_split_service_failover_test, half_split_test)
{
initialize_meta_server_with_mock_app();
auto app = find_app(NAME);
auto split_states = app->helpers->split_states;
ASSERT_EQ(split_states.splitting_count, PARTITION_COUNT - 1);
ASSERT_EQ(split_states.status.find(PARENT_INDEX), split_states.status.end());
ASSERT_EQ(app->partition_count, NEW_PARTITION_COUNT);
ASSERT_EQ(app->partitions.size(), NEW_PARTITION_COUNT);
}

} // namespace replication
} // namespace dsn

0 comments on commit 2ebdfce

Please sign in to comment.