diff --git a/include/dsn/dist/replication/replication_app_base.h b/include/dsn/dist/replication/replication_app_base.h index 244a555315..97c1398020 100644 --- a/include/dsn/dist/replication/replication_app_base.h +++ b/include/dsn/dist/replication/replication_app_base.h @@ -244,6 +244,7 @@ class replication_app_base : public replica_base // routines for replica internal usage friend class replica; friend class replica_stub; + friend class mock_replica; ::dsn::error_code open_internal(replica *r); ::dsn::error_code diff --git a/src/dist/replication/lib/mutation_log.cpp b/src/dist/replication/lib/mutation_log.cpp index afea26a7ae..9a69e811c2 100644 --- a/src/dist/replication/lib/mutation_log.cpp +++ b/src/dist/replication/lib/mutation_log.cpp @@ -33,7 +33,6 @@ #include #include #include -#include namespace dsn { namespace replication { diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index 59bd33455a..59055341f2 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -372,16 +372,23 @@ class replica : public serverlet, public ref_counter, public replica_ba uint64_t total_file_size, decree last_committed_decree); - error_code child_replay_private_log(std::vector plog_files, + // TODO(heyuchen): total_file_size is used for split perf-counter in further pull request + // Applies mutation logs that were learned from the parent of this child. + // This stage follows after that child applies the checkpoint of parent, and begins to apply the + // mutations. + // \param last_committed_decree: parent's last_committed_decree when the checkpoint was + // generated. + error_code child_apply_private_logs(std::vector plog_files, + std::vector mutation_list, uint64_t total_file_size, decree last_committed_decree); - error_code child_learn_mutations(std::vector mutation_list, - decree last_committed_decree); - // child catch up parent states while executing async learn task void child_catch_up_states(); + // child send notification to primary parent when it finish async learn + void child_notify_catch_up(); + // return true if parent status is valid bool parent_check_states(); diff --git a/src/dist/replication/lib/replica_chkpt.cpp b/src/dist/replication/lib/replica_chkpt.cpp index 69d9156d40..f60748fc17 100644 --- a/src/dist/replication/lib/replica_chkpt.cpp +++ b/src/dist/replication/lib/replica_chkpt.cpp @@ -372,6 +372,12 @@ void replica::catch_up_with_private_logs(partition_status::type s) [this, err]() { this->on_learn_remote_state_completed(err); }, get_gpid().thread_hash()); _potential_secondary_states.learn_remote_files_completed_task->enqueue(); + } else if (s == partition_status::PS_PARTITION_SPLIT) { + _split_states.async_learn_task = + tasking::enqueue(LPC_PARTITION_SPLIT, + tracker(), + std::bind(&replica::child_catch_up_states, this), + get_gpid().thread_hash()); } else { _secondary_states.checkpoint_completed_task = tasking::create_task(LPC_CHECKPOINT_REPLICA_COMPLETED, diff --git a/src/dist/replication/lib/replica_context.cpp b/src/dist/replication/lib/replica_context.cpp index 8cee5ed705..e014115e8e 100644 --- a/src/dist/replication/lib/replica_context.cpp +++ b/src/dist/replication/lib/replica_context.cpp @@ -1295,6 +1295,7 @@ bool partition_split_context::cleanup(bool force) parent_gpid.set_app_id(0); is_prepare_list_copied = false; + is_caught_up = false; return true; } diff --git a/src/dist/replication/lib/replica_context.h b/src/dist/replication/lib/replica_context.h index cdf505edc5..ae5eee773e 100644 --- a/src/dist/replication/lib/replica_context.h +++ b/src/dist/replication/lib/replica_context.h @@ -522,13 +522,15 @@ typedef dsn::ref_ptr cold_backup_context_ptr; class partition_split_context { public: - partition_split_context() : is_prepare_list_copied(false) {} bool cleanup(bool force); bool is_cleaned() const; public: gpid parent_gpid; - bool is_prepare_list_copied; + // whether child has copied parent prepare list + bool is_prepare_list_copied{false}; + // whether child has catched up with parent during async-learn + bool is_caught_up{false}; // child replica async learn parent states dsn::task_ptr async_learn_task; diff --git a/src/dist/replication/lib/replica_split.cpp b/src/dist/replication/lib/replica_split.cpp index 55288aa516..dc12ba4df6 100644 --- a/src/dist/replication/lib/replica_split.cpp +++ b/src/dist/replication/lib/replica_split.cpp @@ -95,6 +95,7 @@ void replica::child_init_replica(gpid parent_gpid, // init split states _split_states.parent_gpid = parent_gpid; _split_states.is_prepare_list_copied = false; + _split_states.is_caught_up = false; ddebug_replica("init ballot is {}, parent gpid is ({})", init_ballot, parent_gpid); @@ -279,7 +280,6 @@ void replica::child_learn_states(learn_state lstate, plog_files.size(), mutation_list.size()); - // apply parent checkpoint error_code err; auto cleanup = defer([this, &err]() { if (err != ERR_OK) { @@ -287,26 +287,21 @@ void replica::child_learn_states(learn_state lstate, } }); + // apply parent checkpoint err = _app->apply_checkpoint(replication_app_base::chkpt_apply_mode::learn, lstate); if (err != ERR_OK) { derror_replica("failed to apply checkpoint, error={}", err); return; } - // replay parent private log - err = child_replay_private_log(plog_files, total_file_size, last_committed_decree); + // replay parent private log and learn in-memory mutations + err = + child_apply_private_logs(plog_files, mutation_list, total_file_size, last_committed_decree); if (err != ERR_OK) { derror_replica("failed to replay private log, error={}", err); return; } - // learn parent in-memory mutations - err = child_learn_mutations(mutation_list, last_committed_decree); - if (err != ERR_OK) { - derror_replica("failed to learn mutations, error={}", err); - return; - } - // generate a checkpoint synchronously err = _app->sync_checkpoint(); if (err != ERR_OK) { @@ -330,29 +325,155 @@ void replica::child_learn_states(learn_state lstate, } // ThreadPool: THREAD_POOL_REPLICATION_LONG -error_code replica::child_replay_private_log(std::vector plog_files, +error_code replica::child_apply_private_logs(std::vector plog_files, + std::vector mutation_list, uint64_t total_file_size, decree last_committed_decree) // on child partition { - FAIL_POINT_INJECT_F("replica_child_replay_private_log", - [](dsn::string_view) { return ERR_OK; }); - // TODO(heyuchen): TBD - return ERR_OK; -} + FAIL_POINT_INJECT_F("replica_child_apply_private_logs", [](dsn::string_view arg) { + return error_code::try_get(arg.data(), ERR_OK); + }); -// ThreadPool: THREAD_POOL_REPLICATION_LONG -error_code replica::child_learn_mutations(std::vector mutation_list, - decree last_committed_decree) // on child partition -{ - FAIL_POINT_INJECT_F("replica_child_learn_mutations", [](dsn::string_view) { return ERR_OK; }); - // TODO(heyuchen): TBD - return ERR_OK; + if (status() != partition_status::PS_PARTITION_SPLIT) { + dwarn_replica("wrong status={}", enum_to_string(status())); + return ERR_INVALID_STATE; + } + + error_code ec; + int64_t offset; + // temp prepare_list used for apply states + prepare_list plist(this, + _app->last_committed_decree(), + _options->max_mutation_count_in_prepare_list, + [this](mutation_ptr &mu) { + if (mu->data.header.decree == _app->last_committed_decree() + 1) { + _app->apply_mutation(mu); + } + }); + + // replay private log + ec = mutation_log::replay(plog_files, + [this, &plist](int log_length, mutation_ptr &mu) { + decree d = mu->data.header.decree; + if (d <= plist.last_committed_decree()) { + return false; + } + mutation_ptr origin_mu = plist.get_mutation_by_decree(d); + if (origin_mu != nullptr && + origin_mu->data.header.ballot >= mu->data.header.ballot) { + return false; + } + plist.prepare(mu, partition_status::PS_SECONDARY); + return true; + }, + offset); + if (ec != ERR_OK) { + dwarn_replica( + "replay private_log files failed, file count={}, app last_committed_decree={}", + plog_files.size(), + _app->last_committed_decree()); + return ec; + } + + ddebug_replica("replay private_log files succeed, file count={}, app last_committed_decree={}", + plog_files.size(), + _app->last_committed_decree()); + + // apply in-memory mutations if replay private logs succeed + int count = 0; + for (mutation_ptr &mu : mutation_list) { + decree d = mu->data.header.decree; + if (d <= plist.last_committed_decree()) { + continue; + } + mutation_ptr origin_mu = plist.get_mutation_by_decree(d); + if (origin_mu != nullptr && origin_mu->data.header.ballot >= mu->data.header.ballot) { + continue; + } + if (!mu->is_logged()) { + mu->set_logged(); + } + plist.prepare(mu, partition_status::PS_SECONDARY); + ++count; + } + plist.commit(last_committed_decree, COMMIT_TO_DECREE_HARD); + ddebug_replica( + "apply in-memory mutations succeed, mutation count={}, app last_committed_decree={}", + count, + _app->last_committed_decree()); + + return ec; } // ThreadPool: THREAD_POOL_REPLICATION void replica::child_catch_up_states() // on child partition { FAIL_POINT_INJECT_F("replica_child_catch_up_states", [](dsn::string_view) {}); + + if (status() != partition_status::PS_PARTITION_SPLIT) { + dwarn_replica("wrong status, status is {}", enum_to_string(status())); + return; + } + + // parent will copy mutations to child during async-learn, as a result: + // - child prepare_list last_committed_decree = parent prepare_list last_committed_decree, also + // is catch_up goal_decree + // - local_decree is child local last_committed_decree which is the last decree in async-learn. + decree goal_decree = _prepare_list->last_committed_decree(); + decree local_decree = _app->last_committed_decree(); + + // there are mutations written to parent during async-learn + // child does not catch up parent, there are still some mutations child not learn + if (local_decree < goal_decree) { + if (local_decree >= _prepare_list->min_decree()) { + // all missing mutations are all in prepare list + dwarn_replica("there are some in-memory mutations should be learned, app " + "last_committed_decree={}, " + "goal decree={}, prepare_list min_decree={}", + local_decree, + goal_decree, + _prepare_list->min_decree()); + for (decree d = local_decree + 1; d <= goal_decree; ++d) { + auto mu = _prepare_list->get_mutation_by_decree(d); + dassert(mu != nullptr, ""); + error_code ec = _app->apply_mutation(mu); + if (ec != ERR_OK) { + child_handle_split_error("child_catchup failed because apply mutation failed"); + return; + } + } + } else { + // some missing mutations have already in private log + // should call `catch_up_with_private_logs` to catch up all missing mutations + dwarn_replica( + "there are some private logs should be learned, app last_committed_decree=" + "{}, prepare_list min_decree={}, please wait", + local_decree, + _prepare_list->min_decree()); + _split_states.async_learn_task = tasking::enqueue( + LPC_CATCHUP_WITH_PRIVATE_LOGS, + tracker(), + [this]() { + catch_up_with_private_logs(partition_status::PS_PARTITION_SPLIT); + _split_states.async_learn_task = nullptr; + }, + get_gpid().thread_hash()); + return; + } + } + + ddebug_replica("child catch up parent states, goal decree={}, local decree={}", + _prepare_list->last_committed_decree(), + _app->last_committed_decree()); + _split_states.is_caught_up = true; + + child_notify_catch_up(); +} + +// ThreadPool: THREAD_POOL_REPLICATION +void replica::child_notify_catch_up() // on child partition +{ + FAIL_POINT_INJECT_F("replica_child_notify_catch_up", [](dsn::string_view) {}); // TODO(heyuchen): TBD } diff --git a/src/dist/replication/lib/replication_app_base.cpp b/src/dist/replication/lib/replication_app_base.cpp index 9a46038504..d524b840fd 100644 --- a/src/dist/replication/lib/replication_app_base.cpp +++ b/src/dist/replication/lib/replication_app_base.cpp @@ -35,6 +35,7 @@ #include #include #include +#include namespace dsn { namespace replication { @@ -456,6 +457,9 @@ int replication_app_base::on_batched_write_requests(int64_t decree, ::dsn::error_code replication_app_base::apply_mutation(const mutation *mu) { + FAIL_POINT_INJECT_F("replication_app_base_apply_mutation", + [](dsn::string_view) { return ERR_OK; }); + dassert(mu->data.header.decree == last_committed_decree() + 1, "invalid mutation decree, decree = %" PRId64 " VS %" PRId64 "", mu->data.header.decree, diff --git a/src/dist/replication/test/replica_test/unit_test/mock_utils.h b/src/dist/replication/test/replica_test/unit_test/mock_utils.h index 505bcb0126..e368966dd1 100644 --- a/src/dist/replication/test/replica_test/unit_test/mock_utils.h +++ b/src/dist/replication/test/replica_test/unit_test/mock_utils.h @@ -132,6 +132,10 @@ class mock_replica : public replica void set_init_child_ballot(ballot b) { _child_init_ballot = b; } void set_last_committed_decree(decree d) { _prepare_list->reset(d); } prepare_list *get_plist() { return _prepare_list; } + void prepare_list_truncate(decree d) { _prepare_list->truncate(d); } + void prepare_list_commit_hard(decree d) { _prepare_list->commit(d, COMMIT_TO_DECREE_HARD); } + decree get_app_last_committed_decree() { return _app->last_committed_decree(); } + void set_app_last_committed_decree(decree d) { _app->_last_committed_decree = d; } private: decree _max_gced_decree{invalid_decree - 1}; diff --git a/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp index 644034a7e4..db7931f3b2 100644 --- a/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp +++ b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp @@ -67,8 +67,8 @@ class replica_split_test : public testing::Test void mock_prepare_list(mock_replica_ptr rep, bool add_to_plog) { - _mock_plist = new prepare_list(rep, 0, _max_count, nullptr); - for (int i = 0; i < _max_count; ++i) { + _mock_plist = new prepare_list(rep, 1, _max_count, [](mutation_ptr mu) {}); + for (int i = 1; i < _max_count + 1; ++i) { mutation_ptr mu = new mutation(); mu->data.header.decree = i; mu->data.header.ballot = _init_ballot; @@ -88,13 +88,26 @@ class replica_split_test : public testing::Test mock_prepare_list(_parent, true); } - void mock_child_split_context(gpid parent_gpid, bool is_prepare_list_copied) + void mock_child_split_context(gpid parent_gpid, bool is_prepare_list_copied, bool is_caught_up) { _child->_split_states.parent_gpid = parent_gpid; _child->_split_states.is_prepare_list_copied = is_prepare_list_copied; + _child->_split_states.is_caught_up = is_caught_up; } - void mock_child_async_learn_states(mock_replica_ptr plist_rep, bool add_to_plog) + void mock_mutation_list(decree min_decree) + { + // mock mutation list + for (int d = 1; d < _max_count; ++d) { + mutation_ptr mu = _mock_plist->get_mutation_by_decree(d); + if (d > min_decree) { + _mutation_list.push_back(mu); + } + } + } + + void + mock_child_async_learn_states(mock_replica_ptr plist_rep, bool add_to_plog, decree min_decree) { mock_shared_log(); mock_private_log(_child_pid, _child, false); @@ -105,12 +118,7 @@ class replica_split_test : public testing::Test // mock parent private log files _private_log_files.push_back("log.1.0.txt"); // mock mutation list - for (int d = 0; d < _max_count; ++d) { - mutation_ptr mu = _mock_plist->get_mutation_by_decree(d); - if (d > _decree) { - _mutation_list.push_back(mu); - } - } + mock_mutation_list(min_decree); } void cleanup_prepare_list(mock_replica_ptr rep) { rep->_prepare_list->reset(0); } @@ -139,7 +147,7 @@ class replica_split_test : public testing::Test void test_child_copy_prepare_list() { - mock_child_async_learn_states(_parent, false); + mock_child_async_learn_states(_parent, false, _decree); std::shared_ptr plist = std::make_shared(_parent, *_mock_plist); _child->child_copy_prepare_list(_mock_learn_state, _mutation_list, @@ -151,12 +159,34 @@ class replica_split_test : public testing::Test void test_child_learn_states() { - mock_child_async_learn_states(_child, true); + mock_child_async_learn_states(_child, true, _decree); _child->child_learn_states( _mock_learn_state, _mutation_list, _private_log_files, _total_file_size, _decree); _child->tracker()->wait_outstanding_tasks(); } + void test_child_apply_private_logs() + { + mock_child_async_learn_states(_child, true, 0); + _child->child_apply_private_logs( + _private_log_files, _mutation_list, _total_file_size, _decree); + _child->tracker()->wait_outstanding_tasks(); + } + + void test_child_catch_up_states(decree local_decree, decree goal_decree, decree min_decree) + { + mock_child_async_learn_states(_child, true, 0); + _child->set_app_last_committed_decree(local_decree); + if (local_decree < goal_decree) { + // set prepare_list's start_decree = {min_decree} + _child->prepare_list_truncate(min_decree); + // set prepare_list's last_committed_decree = {goal_decree} + _child->prepare_list_commit_hard(goal_decree); + } + _child->child_catch_up_states(); + _child->tracker()->wait_outstanding_tasks(); + } + public: std::unique_ptr _stub; @@ -240,7 +270,7 @@ TEST_F(replica_split_test, parent_prepare_states_succeed) TEST_F(replica_split_test, copy_prepare_list_with_wrong_status) { generate_child(partition_status::PS_INACTIVE); - mock_child_split_context(_parent_pid, false); + mock_child_split_context(_parent_pid, false, false); fail::setup(); fail::cfg("replica_child_learn_states", "return()"); @@ -254,7 +284,7 @@ TEST_F(replica_split_test, copy_prepare_list_with_wrong_status) TEST_F(replica_split_test, copy_prepare_list_succeed) { generate_child(partition_status::PS_PARTITION_SPLIT); - mock_child_split_context(_parent_pid, false); + mock_child_split_context(_parent_pid, false, false); fail::setup(); fail::cfg("replica_child_learn_states", "return()"); @@ -273,11 +303,25 @@ TEST_F(replica_split_test, copy_prepare_list_succeed) TEST_F(replica_split_test, learn_states_succeed) { generate_child(partition_status::PS_PARTITION_SPLIT); - mock_child_split_context(_parent_pid, true); + mock_child_split_context(_parent_pid, true, false); + + fail::setup(); + fail::cfg("replica_child_apply_private_logs", "return()"); + fail::cfg("replica_child_catch_up_states", "return()"); + test_child_learn_states(); + fail::teardown(); + + cleanup_prepare_list(_child); + cleanup_child_split_context(); +} + +TEST_F(replica_split_test, learn_states_with_replay_private_log_error) +{ + generate_child(partition_status::PS_PARTITION_SPLIT); + mock_child_split_context(_parent_pid, true, false); fail::setup(); - fail::cfg("replica_child_replay_private_log", "return()"); - fail::cfg("replica_child_learn_mutations", "return()"); + fail::cfg("replica_child_apply_private_logs", "return(error)"); fail::cfg("replica_child_catch_up_states", "return()"); test_child_learn_states(); fail::teardown(); @@ -286,7 +330,55 @@ TEST_F(replica_split_test, learn_states_succeed) cleanup_child_split_context(); } -// TODO(heyuchen): add learn_states failed case +TEST_F(replica_split_test, child_apply_private_logs_succeed) +{ + generate_child(partition_status::PS_PARTITION_SPLIT); + mock_child_split_context(_parent_pid, true, false); + + fail::setup(); + fail::cfg("mutation_log_replay_succeed", "return()"); + fail::cfg("replication_app_base_apply_mutation", "return()"); + test_child_apply_private_logs(); + fail::teardown(); + + cleanup_prepare_list(_child); + cleanup_child_split_context(); +} + +TEST_F(replica_split_test, catch_up_succeed_with_all_states_learned) +{ + generate_child(partition_status::PS_PARTITION_SPLIT); + mock_child_split_context(_parent_pid, true, false); + + fail::setup(); + fail::cfg("replica_child_notify_catch_up", "return()"); + test_child_catch_up_states(_decree, _decree, _decree); + fail::teardown(); + + partition_split_context split_context = get_split_context(); + ASSERT_EQ(split_context.is_caught_up, true); + + cleanup_prepare_list(_child); + cleanup_child_split_context(); +} + +TEST_F(replica_split_test, catch_up_succeed_with_learn_in_memory_mutations) +{ + generate_child(partition_status::PS_PARTITION_SPLIT); + mock_child_split_context(_parent_pid, true, false); + + fail::setup(); + fail::cfg("replica_child_notify_catch_up", "return()"); + fail::cfg("replication_app_base_apply_mutation", "return()"); + test_child_catch_up_states(_decree, _max_count - 1, 1); + fail::teardown(); + + partition_split_context split_context = get_split_context(); + ASSERT_EQ(split_context.is_caught_up, true); + + cleanup_prepare_list(_child); + cleanup_child_split_context(); +} } // namespace replication } // namespace dsn