Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(split): child replica apply private logs, in-memory mutations and catch up parent #319

Merged
merged 21 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/dist/replication/lib/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include <dsn/utility/fail_point.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/tool-api/async_calls.h>
#include <dsn/dist/fmt_logging.h>

namespace dsn {
namespace replication {
Expand Down
11 changes: 7 additions & 4 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,16 +352,19 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
uint64_t total_file_size,
decree last_committed_decree);

error_code child_replay_private_log(std::vector<std::string> plog_files,
// TODO(heyuchen): total_file_size is used for split perf-counter in further pull request
hycdong marked this conversation as resolved.
Show resolved Hide resolved
// child replay private logs and learn in-memory mutations
error_code child_apply_private_logs(std::vector<std::string> plog_files,
std::vector<mutation_ptr> mutation_list,
uint64_t total_file_size,
decree last_committed_decree);

error_code child_learn_mutations(std::vector<mutation_ptr> mutation_list,
decree last_committed_decree);

// child catch up parent states while executing async learn task
void child_catch_up_states();

// child send notification to primary parent when it finish async learn
void child_notify_catch_up();

// return true if parent status is valid
bool parent_check_states();

Expand Down
9 changes: 9 additions & 0 deletions src/dist/replication/lib/replica_chkpt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "mutation.h"
#include "mutation_log.h"
#include "replica_stub.h"
#include <dsn/utility/fail_point.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/chrono_literals.h>
#include <dsn/dist/replication/replication_app_base.h>
Expand Down Expand Up @@ -328,6 +329,8 @@ error_code replica::background_sync_checkpoint()
// in non-replication thread
void replica::catch_up_with_private_logs(partition_status::type s)
{
FAIL_POINT_INJECT_F("replica_chkpt_catch_up_with_private_logs", [this](dsn::string_view) {});

learn_state state;
_private_log->get_learn_state(get_gpid(), _app->last_committed_decree() + 1, state);

Expand All @@ -340,6 +343,12 @@ void replica::catch_up_with_private_logs(partition_status::type s)
[this, err]() { this->on_learn_remote_state_completed(err); },
get_gpid().thread_hash());
_potential_secondary_states.learn_remote_files_completed_task->enqueue();
} else if (s == partition_status::PS_PARTITION_SPLIT) {
_split_states.async_learn_task =
tasking::enqueue(LPC_PARTITION_SPLIT,
tracker(),
std::bind(&replica::child_catch_up_states, this),
get_gpid().thread_hash());
} else {
_secondary_states.checkpoint_completed_task =
tasking::create_task(LPC_CHECKPOINT_REPLICA_COMPLETED,
Expand Down
1 change: 1 addition & 0 deletions src/dist/replication/lib/replica_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1294,6 +1294,7 @@ bool partition_split_context::cleanup(bool force)

parent_gpid.set_app_id(0);
is_prepare_list_copied = false;
is_caught_up = false;
return true;
}

Expand Down
3 changes: 2 additions & 1 deletion src/dist/replication/lib/replica_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -526,13 +526,14 @@ typedef dsn::ref_ptr<cold_backup_context> cold_backup_context_ptr;
class partition_split_context
{
public:
partition_split_context() : is_prepare_list_copied(false) {}
partition_split_context() : is_prepare_list_copied(false), is_caught_up(false) {}
bool cleanup(bool force);
bool is_cleaned() const;

public:
gpid parent_gpid;
bool is_prepare_list_copied;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public 的变量务必要写注释, 之前 codereview 规范不严格, 现在 is_prepare_list_copied 具体指的是啥我都忘了. 另外 is_caught_up 是做什么用的?我现在只看到了有对这个变量 set,没有看到 get。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个变量后面的pr会get

bool is_caught_up;
hycdong marked this conversation as resolved.
Show resolved Hide resolved

// child replica async learn parent states
dsn::task_ptr async_learn_task;
Expand Down
158 changes: 135 additions & 23 deletions src/dist/replication/lib/replica_split.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ void replica::child_init_replica(gpid parent_gpid,
// init split states
_split_states.parent_gpid = parent_gpid;
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
_split_states.is_prepare_list_copied = false;
_split_states.is_caught_up = false;

ddebug_replica("init ballot is {}, parent gpid is ({})", init_ballot, parent_gpid);

Expand Down Expand Up @@ -279,34 +280,28 @@ void replica::child_learn_states(learn_state lstate,
plog_files.size(),
mutation_list.size());

// apply parent checkpoint
error_code err;
auto cleanup = defer([this, &err]() {
if (err != ERR_OK) {
child_handle_async_learn_error();
}
});

// apply parent checkpoint
err = _app->apply_checkpoint(replication_app_base::chkpt_apply_mode::learn, lstate);
if (err != ERR_OK) {
derror_replica("failed to apply checkpoint, error={}", err);
return;
}

// replay parent private log
err = child_replay_private_log(plog_files, total_file_size, last_committed_decree);
// replay parent private log and learn in-memory mutations
err =
child_apply_private_logs(plog_files, mutation_list, total_file_size, last_committed_decree);
if (err != ERR_OK) {
derror_replica("failed to replay private log, error={}", err);
return;
}

// learn parent in-memory mutations
err = child_learn_mutations(mutation_list, last_committed_decree);
if (err != ERR_OK) {
derror_replica("failed to learn mutations, error={}", err);
return;
}

// generate a checkpoint synchronously
err = _app->sync_checkpoint();
if (err != ERR_OK) {
Expand All @@ -330,30 +325,147 @@ void replica::child_learn_states(learn_state lstate,
}

// ThreadPool: THREAD_POOL_REPLICATION_LONG
error_code replica::child_replay_private_log(std::vector<std::string> plog_files,
error_code replica::child_apply_private_logs(std::vector<std::string> plog_files,
std::vector<mutation_ptr> mutation_list,
uint64_t total_file_size,
decree last_committed_decree) // on child partition
{
FAIL_POINT_INJECT_F("replica_child_replay_private_log",
FAIL_POINT_INJECT_F("replica_child_apply_private_logs",
[](dsn::string_view) { return ERR_OK; });
// TODO(heyuchen): TBD
return ERR_OK;
}
FAIL_POINT_INJECT_F("replica_child_apply_private_logs_error",
hycdong marked this conversation as resolved.
Show resolved Hide resolved
[](dsn::string_view) { return ERR_FILE_OPERATION_FAILED; });

// ThreadPool: THREAD_POOL_REPLICATION_LONG
error_code replica::child_learn_mutations(std::vector<mutation_ptr> mutation_list,
decree last_committed_decree) // on child partition
{
FAIL_POINT_INJECT_F("replica_child_learn_mutations", [](dsn::string_view) { return ERR_OK; });
// TODO(heyuchen): TBD
return ERR_OK;
if (status() != partition_status::PS_PARTITION_SPLIT) {
dwarn_replica("wrong status=", enum_to_string(status()));
hycdong marked this conversation as resolved.
Show resolved Hide resolved
return ERR_INVALID_STATE;
}

error_code ec;
int64_t offset;
// temp prepare_list used for apply states
prepare_list plist(this,
_app->last_committed_decree(),
_options->max_mutation_count_in_prepare_list,
[this, &ec](mutation_ptr &mu) {
hycdong marked this conversation as resolved.
Show resolved Hide resolved
if (mu->data.header.decree == _app->last_committed_decree() + 1) {
_app->apply_mutation(mu);
}
});

// replay private log
ec = mutation_log::replay(
plog_files,
[this, &plist](int log_length, mutation_ptr &mu) {
decree d = mu->data.header.decree;
if (d <= plist.last_committed_decree()) {
return false;
}
mutation_ptr origin_mu = plist.get_mutation_by_decree(d);
if (origin_mu != nullptr && origin_mu->data.header.ballot >= mu->data.header.ballot) {
dwarn_replica("mutation({}) existed, origin mutation={}, ignore obsolete mutation",
hycdong marked this conversation as resolved.
Show resolved Hide resolved
mu->name(),
origin_mu->data.header.ballot);
return false;
}
plist.prepare(mu, partition_status::PS_SECONDARY);
return true;
},
offset);

// apply in-memory mutations if replay private logs succeed
if (ec == ERR_OK) {
hycdong marked this conversation as resolved.
Show resolved Hide resolved
ddebug_replica(
"replay private_log files succeed, file count={}, app last_committed_decree={}",
plog_files.size(),
_app->last_committed_decree());

int count = 0;
for (mutation_ptr &mu : mutation_list) {
decree d = mu->data.header.decree;
if (d <= plist.last_committed_decree()) {
continue;
}
mutation_ptr origin_mu = plist.get_mutation_by_decree(d);
if (origin_mu != nullptr && origin_mu->data.header.ballot >= mu->data.header.ballot) {
continue;
}
if (!mu->is_logged()) {
mu->set_logged();
}
plist.prepare(mu, partition_status::PS_SECONDARY);
++count;
}
plist.commit(last_committed_decree, COMMIT_TO_DECREE_HARD);
ddebug_replica(
"apply in-memory mutations succeed, mutation count={}, app last_committed_decree={}",
count,
_app->last_committed_decree());
}
return ec;
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica::child_catch_up_states() // on child partition
{
FAIL_POINT_INJECT_F("replica_child_catch_up_states", [](dsn::string_view) {});
// TODO(heyuchen): TBD

if (status() != partition_status::PS_PARTITION_SPLIT) {
derror_replica("wrong status = {}", status());
hycdong marked this conversation as resolved.
Show resolved Hide resolved
return;
}

decree goal_decree = _prepare_list->last_committed_decree();
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
decree local_decree = _app->last_committed_decree();

// there are still some mutations child not learn
hycdong marked this conversation as resolved.
Show resolved Hide resolved
if (local_decree < goal_decree) {
if (local_decree >=
_prepare_list->min_decree()) { // missing mutations are all in prepare_list
dwarn_replica("there are some in-memory mutations should be learned, app "
"last_committed_decree={}, "
"goal decree={}, prepare_list min_decree={}",
local_decree,
goal_decree,
_prepare_list->min_decree());
for (decree d = local_decree + 1; d <= goal_decree; ++d) {
auto mu = _prepare_list->get_mutation_by_decree(d);
dassert(mu != nullptr, "");
error_code ec = _app->apply_mutation(mu);
if (ec != ERR_OK) {
child_handle_split_error("child_catchup failed because apply mutation failed");
return;
}
}
} else { // some missing mutations are not in memory
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
dwarn_replica(
"there are some private logs should be learned, app last_committed_decree="
"{}, prepare_list min_decree={}, please wait",
local_decree,
_prepare_list->min_decree());
_split_states.async_learn_task = tasking::enqueue(
LPC_CATCHUP_WITH_PRIVATE_LOGS,
tracker(),
[this]() {
this->catch_up_with_private_logs(partition_status::PS_PARTITION_SPLIT);
hycdong marked this conversation as resolved.
Show resolved Hide resolved
_split_states.async_learn_task = nullptr;
},
get_gpid().thread_hash());
return;
}
}

ddebug_replica("child catch up parent states, goal decree={}, local decree={}",
_prepare_list->last_committed_decree(),
_app->last_committed_decree());
_split_states.is_caught_up = true;

child_notify_catch_up();
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica::child_notify_catch_up() // on child partition
{
FAIL_POINT_INJECT_F("replica_child_notify_catch_up", [](dsn::string_view) {});
}

// ThreadPool: THREAD_POOL_REPLICATION
Expand Down
4 changes: 4 additions & 0 deletions src/dist/replication/lib/replication_app_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <fstream>
#include <sstream>
#include <memory>
#include <dsn/utility/fail_point.h>

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -454,6 +455,9 @@ int replication_app_base::on_batched_write_requests(int64_t decree,

::dsn::error_code replication_app_base::apply_mutation(const mutation *mu)
{
FAIL_POINT_INJECT_F("replication_app_base_apply_mutation",
[](dsn::string_view) { return ERR_OK; });

dassert(mu->data.header.decree == last_committed_decree() + 1,
"invalid mutation decree, decree = %" PRId64 " VS %" PRId64 "",
mu->data.header.decree,
Expand Down
2 changes: 2 additions & 0 deletions src/dist/replication/test/replica_test/unit_test/mock_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ class mock_replica : public replica
void set_init_child_ballot(ballot b) { _child_init_ballot = b; }
void set_last_committed_decree(decree d) { _prepare_list->reset(d); }
prepare_list *get_plist() { return _prepare_list; }
void prepare_list_truncate(decree d) { _prepare_list->truncate(d); }
void prepare_list_commit_hard(decree d) { _prepare_list->commit(d, COMMIT_TO_DECREE_HARD); }
};
typedef dsn::ref_ptr<mock_replica> mock_replica_ptr;

Expand Down
Loading