Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(split): child replica apply private logs, in-memory mutations and catch up parent #319

Merged
merged 21 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/dsn/dist/replication/replication_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ class replication_app_base : public replica_base
// routines for replica internal usage
friend class replica;
friend class replica_stub;
friend class mock_replica;

::dsn::error_code open_internal(replica *r);
::dsn::error_code
Expand Down
1 change: 0 additions & 1 deletion src/dist/replication/lib/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include <dsn/utility/fail_point.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/tool-api/async_calls.h>
#include <dsn/dist/fmt_logging.h>

namespace dsn {
namespace replication {
Expand Down
15 changes: 11 additions & 4 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,16 +372,23 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
uint64_t total_file_size,
decree last_committed_decree);

error_code child_replay_private_log(std::vector<std::string> plog_files,
// TODO(heyuchen): total_file_size is used for split perf-counter in further pull request
hycdong marked this conversation as resolved.
Show resolved Hide resolved
// Applies mutation logs that were learned from the parent of this child.
// This stage follows after that child applies the checkpoint of parent, and begins to apply the
// mutations.
// \param last_committed_decree: parent's last_committed_decree when the checkpoint was
// generated.
error_code child_apply_private_logs(std::vector<std::string> plog_files,
std::vector<mutation_ptr> mutation_list,
uint64_t total_file_size,
decree last_committed_decree);

error_code child_learn_mutations(std::vector<mutation_ptr> mutation_list,
decree last_committed_decree);

// child catch up parent states while executing async learn task
void child_catch_up_states();

// child send notification to primary parent when it finish async learn
void child_notify_catch_up();

// return true if parent status is valid
bool parent_check_states();

Expand Down
6 changes: 6 additions & 0 deletions src/dist/replication/lib/replica_chkpt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,12 @@ void replica::catch_up_with_private_logs(partition_status::type s)
[this, err]() { this->on_learn_remote_state_completed(err); },
get_gpid().thread_hash());
_potential_secondary_states.learn_remote_files_completed_task->enqueue();
} else if (s == partition_status::PS_PARTITION_SPLIT) {
_split_states.async_learn_task =
tasking::enqueue(LPC_PARTITION_SPLIT,
tracker(),
std::bind(&replica::child_catch_up_states, this),
get_gpid().thread_hash());
} else {
_secondary_states.checkpoint_completed_task =
tasking::create_task(LPC_CHECKPOINT_REPLICA_COMPLETED,
Expand Down
1 change: 1 addition & 0 deletions src/dist/replication/lib/replica_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,7 @@ bool partition_split_context::cleanup(bool force)

parent_gpid.set_app_id(0);
is_prepare_list_copied = false;
is_caught_up = false;
return true;
}

Expand Down
6 changes: 4 additions & 2 deletions src/dist/replication/lib/replica_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,15 @@ typedef dsn::ref_ptr<cold_backup_context> cold_backup_context_ptr;
class partition_split_context
{
public:
partition_split_context() : is_prepare_list_copied(false) {}
bool cleanup(bool force);
bool is_cleaned() const;

public:
gpid parent_gpid;
bool is_prepare_list_copied;
// whether child has copied parent prepare list
bool is_prepare_list_copied{false};
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
// whether child has catched up with parent during async-learn
bool is_caught_up{false};

// child replica async learn parent states
dsn::task_ptr async_learn_task;
Expand Down
167 changes: 144 additions & 23 deletions src/dist/replication/lib/replica_split.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ void replica::child_init_replica(gpid parent_gpid,
// init split states
_split_states.parent_gpid = parent_gpid;
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
_split_states.is_prepare_list_copied = false;
_split_states.is_caught_up = false;

ddebug_replica("init ballot is {}, parent gpid is ({})", init_ballot, parent_gpid);

Expand Down Expand Up @@ -279,34 +280,28 @@ void replica::child_learn_states(learn_state lstate,
plog_files.size(),
mutation_list.size());

// apply parent checkpoint
error_code err;
auto cleanup = defer([this, &err]() {
if (err != ERR_OK) {
child_handle_async_learn_error();
}
});

// apply parent checkpoint
err = _app->apply_checkpoint(replication_app_base::chkpt_apply_mode::learn, lstate);
if (err != ERR_OK) {
derror_replica("failed to apply checkpoint, error={}", err);
return;
}

// replay parent private log
err = child_replay_private_log(plog_files, total_file_size, last_committed_decree);
// replay parent private log and learn in-memory mutations
err =
child_apply_private_logs(plog_files, mutation_list, total_file_size, last_committed_decree);
if (err != ERR_OK) {
derror_replica("failed to replay private log, error={}", err);
return;
}

// learn parent in-memory mutations
err = child_learn_mutations(mutation_list, last_committed_decree);
if (err != ERR_OK) {
derror_replica("failed to learn mutations, error={}", err);
return;
}

// generate a checkpoint synchronously
err = _app->sync_checkpoint();
if (err != ERR_OK) {
Expand All @@ -330,29 +325,155 @@ void replica::child_learn_states(learn_state lstate,
}

// ThreadPool: THREAD_POOL_REPLICATION_LONG
error_code replica::child_replay_private_log(std::vector<std::string> plog_files,
error_code replica::child_apply_private_logs(std::vector<std::string> plog_files,
std::vector<mutation_ptr> mutation_list,
uint64_t total_file_size,
decree last_committed_decree) // on child partition
{
FAIL_POINT_INJECT_F("replica_child_replay_private_log",
[](dsn::string_view) { return ERR_OK; });
// TODO(heyuchen): TBD
return ERR_OK;
}
FAIL_POINT_INJECT_F("replica_child_apply_private_logs", [](dsn::string_view arg) {
return error_code::try_get(arg.data(), ERR_OK);
});

// ThreadPool: THREAD_POOL_REPLICATION_LONG
error_code replica::child_learn_mutations(std::vector<mutation_ptr> mutation_list,
decree last_committed_decree) // on child partition
{
FAIL_POINT_INJECT_F("replica_child_learn_mutations", [](dsn::string_view) { return ERR_OK; });
// TODO(heyuchen): TBD
return ERR_OK;
if (status() != partition_status::PS_PARTITION_SPLIT) {
dwarn_replica("wrong status={}", enum_to_string(status()));
return ERR_INVALID_STATE;
}

error_code ec;
int64_t offset;
// temp prepare_list used for apply states
prepare_list plist(this,
_app->last_committed_decree(),
_options->max_mutation_count_in_prepare_list,
[this](mutation_ptr &mu) {
if (mu->data.header.decree == _app->last_committed_decree() + 1) {
_app->apply_mutation(mu);
}
});

// replay private log
ec = mutation_log::replay(plog_files,
[this, &plist](int log_length, mutation_ptr &mu) {
decree d = mu->data.header.decree;
if (d <= plist.last_committed_decree()) {
return false;
}
mutation_ptr origin_mu = plist.get_mutation_by_decree(d);
if (origin_mu != nullptr &&
origin_mu->data.header.ballot >= mu->data.header.ballot) {
return false;
}
plist.prepare(mu, partition_status::PS_SECONDARY);
return true;
},
offset);
if (ec != ERR_OK) {
dwarn_replica(
"replay private_log files failed, file count={}, app last_committed_decree={}",
plog_files.size(),
_app->last_committed_decree());
return ec;
}

ddebug_replica("replay private_log files succeed, file count={}, app last_committed_decree={}",
plog_files.size(),
_app->last_committed_decree());

// apply in-memory mutations if replay private logs succeed
int count = 0;
for (mutation_ptr &mu : mutation_list) {
decree d = mu->data.header.decree;
if (d <= plist.last_committed_decree()) {
continue;
}
mutation_ptr origin_mu = plist.get_mutation_by_decree(d);
if (origin_mu != nullptr && origin_mu->data.header.ballot >= mu->data.header.ballot) {
continue;
}
if (!mu->is_logged()) {
mu->set_logged();
}
plist.prepare(mu, partition_status::PS_SECONDARY);
++count;
}
plist.commit(last_committed_decree, COMMIT_TO_DECREE_HARD);
ddebug_replica(
"apply in-memory mutations succeed, mutation count={}, app last_committed_decree={}",
count,
_app->last_committed_decree());

return ec;
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica::child_catch_up_states() // on child partition
{
FAIL_POINT_INJECT_F("replica_child_catch_up_states", [](dsn::string_view) {});

if (status() != partition_status::PS_PARTITION_SPLIT) {
dwarn_replica("wrong status, status is {}", enum_to_string(status()));
return;
}

// parent will copy mutations to child during async-learn, as a result:
// - child prepare_list last_committed_decree = parent prepare_list last_committed_decree, also
// is catch_up goal_decree
// - local_decree is child local last_committed_decree which is the last decree in async-learn.
decree goal_decree = _prepare_list->last_committed_decree();
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
decree local_decree = _app->last_committed_decree();

// there are mutations written to parent during async-learn
// child does not catch up parent, there are still some mutations child not learn
if (local_decree < goal_decree) {
if (local_decree >= _prepare_list->min_decree()) {
// all missing mutations are all in prepare list
dwarn_replica("there are some in-memory mutations should be learned, app "
"last_committed_decree={}, "
"goal decree={}, prepare_list min_decree={}",
local_decree,
goal_decree,
_prepare_list->min_decree());
for (decree d = local_decree + 1; d <= goal_decree; ++d) {
auto mu = _prepare_list->get_mutation_by_decree(d);
dassert(mu != nullptr, "");
error_code ec = _app->apply_mutation(mu);
if (ec != ERR_OK) {
child_handle_split_error("child_catchup failed because apply mutation failed");
return;
}
}
} else {
// some missing mutations have already in private log
// should call `catch_up_with_private_logs` to catch up all missing mutations
dwarn_replica(
"there are some private logs should be learned, app last_committed_decree="
"{}, prepare_list min_decree={}, please wait",
local_decree,
_prepare_list->min_decree());
_split_states.async_learn_task = tasking::enqueue(
LPC_CATCHUP_WITH_PRIVATE_LOGS,
tracker(),
[this]() {
catch_up_with_private_logs(partition_status::PS_PARTITION_SPLIT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

catch_up_with_private_logs 后续 PR 建议改个名字,这个函数和 child_apply_private_logs 对代码不熟悉的很容易看混,而且不明其意

_split_states.async_learn_task = nullptr;
},
get_gpid().thread_hash());
return;
}
}

ddebug_replica("child catch up parent states, goal decree={}, local decree={}",
_prepare_list->last_committed_decree(),
_app->last_committed_decree());
_split_states.is_caught_up = true;

child_notify_catch_up();
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica::child_notify_catch_up() // on child partition
{
FAIL_POINT_INJECT_F("replica_child_notify_catch_up", [](dsn::string_view) {});
// TODO(heyuchen): TBD
}

Expand Down
4 changes: 4 additions & 0 deletions src/dist/replication/lib/replication_app_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <fstream>
#include <sstream>
#include <memory>
#include <dsn/utility/fail_point.h>

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -456,6 +457,9 @@ int replication_app_base::on_batched_write_requests(int64_t decree,

::dsn::error_code replication_app_base::apply_mutation(const mutation *mu)
{
FAIL_POINT_INJECT_F("replication_app_base_apply_mutation",
[](dsn::string_view) { return ERR_OK; });

dassert(mu->data.header.decree == last_committed_decree() + 1,
"invalid mutation decree, decree = %" PRId64 " VS %" PRId64 "",
mu->data.header.decree,
Expand Down
4 changes: 4 additions & 0 deletions src/dist/replication/test/replica_test/unit_test/mock_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ class mock_replica : public replica
void set_init_child_ballot(ballot b) { _child_init_ballot = b; }
void set_last_committed_decree(decree d) { _prepare_list->reset(d); }
prepare_list *get_plist() { return _prepare_list; }
void prepare_list_truncate(decree d) { _prepare_list->truncate(d); }
void prepare_list_commit_hard(decree d) { _prepare_list->commit(d, COMMIT_TO_DECREE_HARD); }
decree get_app_last_committed_decree() { return _app->last_committed_decree(); }
void set_app_last_committed_decree(decree d) { _app->_last_committed_decree = d; }

private:
decree _max_gced_decree{invalid_decree - 1};
Expand Down
Loading