Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(split): child replica apply private logs, in-memory mutations and catch up parent #319

Merged
merged 21 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/dist/replication/lib/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include <dsn/utility/fail_point.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/tool-api/async_calls.h>
#include <dsn/dist/fmt_logging.h>

namespace dsn {
namespace replication {
Expand Down
11 changes: 7 additions & 4 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,16 +372,19 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
uint64_t total_file_size,
decree last_committed_decree);

error_code child_replay_private_log(std::vector<std::string> plog_files,
// TODO(heyuchen): total_file_size is used for split perf-counter in further pull request
hycdong marked this conversation as resolved.
Show resolved Hide resolved
// child replay private logs and learn in-memory mutations
error_code child_apply_private_logs(std::vector<std::string> plog_files,
std::vector<mutation_ptr> mutation_list,
uint64_t total_file_size,
decree last_committed_decree);

error_code child_learn_mutations(std::vector<mutation_ptr> mutation_list,
decree last_committed_decree);

// child catch up parent states while executing async learn task
void child_catch_up_states();

// child send notification to primary parent when it finish async learn
void child_notify_catch_up();

// return true if parent status is valid
bool parent_check_states();

Expand Down
9 changes: 9 additions & 0 deletions src/dist/replication/lib/replica_chkpt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "mutation_log.h"
#include "replica_stub.h"
#include "duplication/replica_duplicator_manager.h"
#include <dsn/utility/fail_point.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/chrono_literals.h>
#include <dsn/dist/replication/replication_app_base.h>
Expand Down Expand Up @@ -360,6 +361,8 @@ error_code replica::background_sync_checkpoint()
// in non-replication thread
void replica::catch_up_with_private_logs(partition_status::type s)
{
FAIL_POINT_INJECT_F("replica_chkpt_catch_up_with_private_logs", [this](dsn::string_view) {});

learn_state state;
_private_log->get_learn_state(get_gpid(), _app->last_committed_decree() + 1, state);

Expand All @@ -372,6 +375,12 @@ void replica::catch_up_with_private_logs(partition_status::type s)
[this, err]() { this->on_learn_remote_state_completed(err); },
get_gpid().thread_hash());
_potential_secondary_states.learn_remote_files_completed_task->enqueue();
} else if (s == partition_status::PS_PARTITION_SPLIT) {
_split_states.async_learn_task =
tasking::enqueue(LPC_PARTITION_SPLIT,
tracker(),
std::bind(&replica::child_catch_up_states, this),
get_gpid().thread_hash());
} else {
_secondary_states.checkpoint_completed_task =
tasking::create_task(LPC_CHECKPOINT_REPLICA_COMPLETED,
Expand Down
1 change: 1 addition & 0 deletions src/dist/replication/lib/replica_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,7 @@ bool partition_split_context::cleanup(bool force)

parent_gpid.set_app_id(0);
is_prepare_list_copied = false;
is_caught_up = false;
return true;
}

Expand Down
3 changes: 2 additions & 1 deletion src/dist/replication/lib/replica_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,14 @@ typedef dsn::ref_ptr<cold_backup_context> cold_backup_context_ptr;
class partition_split_context
{
public:
partition_split_context() : is_prepare_list_copied(false) {}
partition_split_context() : is_prepare_list_copied(false), is_caught_up(false) {}
bool cleanup(bool force);
bool is_cleaned() const;

public:
gpid parent_gpid;
bool is_prepare_list_copied;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public 的变量务必要写注释, 之前 codereview 规范不严格, 现在 is_prepare_list_copied 具体指的是啥我都忘了. 另外 is_caught_up 是做什么用的?我现在只看到了有对这个变量 set,没有看到 get。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个变量后面的pr会get

bool is_caught_up;
hycdong marked this conversation as resolved.
Show resolved Hide resolved

// child replica async learn parent states
dsn::task_ptr async_learn_task;
Expand Down
158 changes: 135 additions & 23 deletions src/dist/replication/lib/replica_split.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ void replica::child_init_replica(gpid parent_gpid,
// init split states
_split_states.parent_gpid = parent_gpid;
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
_split_states.is_prepare_list_copied = false;
_split_states.is_caught_up = false;

ddebug_replica("init ballot is {}, parent gpid is ({})", init_ballot, parent_gpid);

Expand Down Expand Up @@ -279,34 +280,28 @@ void replica::child_learn_states(learn_state lstate,
plog_files.size(),
mutation_list.size());

// apply parent checkpoint
error_code err;
auto cleanup = defer([this, &err]() {
if (err != ERR_OK) {
child_handle_async_learn_error();
}
});

// apply parent checkpoint
err = _app->apply_checkpoint(replication_app_base::chkpt_apply_mode::learn, lstate);
if (err != ERR_OK) {
derror_replica("failed to apply checkpoint, error={}", err);
return;
}

// replay parent private log
err = child_replay_private_log(plog_files, total_file_size, last_committed_decree);
// replay parent private log and learn in-memory mutations
err =
child_apply_private_logs(plog_files, mutation_list, total_file_size, last_committed_decree);
if (err != ERR_OK) {
derror_replica("failed to replay private log, error={}", err);
return;
}

// learn parent in-memory mutations
err = child_learn_mutations(mutation_list, last_committed_decree);
if (err != ERR_OK) {
derror_replica("failed to learn mutations, error={}", err);
return;
}

// generate a checkpoint synchronously
err = _app->sync_checkpoint();
if (err != ERR_OK) {
Expand All @@ -330,30 +325,147 @@ void replica::child_learn_states(learn_state lstate,
}

// ThreadPool: THREAD_POOL_REPLICATION_LONG
error_code replica::child_replay_private_log(std::vector<std::string> plog_files,
error_code replica::child_apply_private_logs(std::vector<std::string> plog_files,
std::vector<mutation_ptr> mutation_list,
uint64_t total_file_size,
decree last_committed_decree) // on child partition
{
FAIL_POINT_INJECT_F("replica_child_replay_private_log",
FAIL_POINT_INJECT_F("replica_child_apply_private_logs",
[](dsn::string_view) { return ERR_OK; });
// TODO(heyuchen): TBD
return ERR_OK;
}
FAIL_POINT_INJECT_F("replica_child_apply_private_logs_error",
hycdong marked this conversation as resolved.
Show resolved Hide resolved
[](dsn::string_view) { return ERR_FILE_OPERATION_FAILED; });

// ThreadPool: THREAD_POOL_REPLICATION_LONG
error_code replica::child_learn_mutations(std::vector<mutation_ptr> mutation_list,
decree last_committed_decree) // on child partition
{
FAIL_POINT_INJECT_F("replica_child_learn_mutations", [](dsn::string_view) { return ERR_OK; });
// TODO(heyuchen): TBD
return ERR_OK;
if (status() != partition_status::PS_PARTITION_SPLIT) {
dwarn_replica("wrong status=", enum_to_string(status()));
hycdong marked this conversation as resolved.
Show resolved Hide resolved
return ERR_INVALID_STATE;
}

error_code ec;
int64_t offset;
// temp prepare_list used for apply states
prepare_list plist(this,
_app->last_committed_decree(),
_options->max_mutation_count_in_prepare_list,
[this, &ec](mutation_ptr &mu) {
hycdong marked this conversation as resolved.
Show resolved Hide resolved
if (mu->data.header.decree == _app->last_committed_decree() + 1) {
_app->apply_mutation(mu);
}
});

// replay private log
ec = mutation_log::replay(
plog_files,
[this, &plist](int log_length, mutation_ptr &mu) {
decree d = mu->data.header.decree;
if (d <= plist.last_committed_decree()) {
return false;
}
mutation_ptr origin_mu = plist.get_mutation_by_decree(d);
if (origin_mu != nullptr && origin_mu->data.header.ballot >= mu->data.header.ballot) {
dwarn_replica("mutation({}) existed, origin mutation={}, ignore obsolete mutation",
hycdong marked this conversation as resolved.
Show resolved Hide resolved
mu->name(),
origin_mu->data.header.ballot);
return false;
}
plist.prepare(mu, partition_status::PS_SECONDARY);
return true;
},
offset);

// apply in-memory mutations if replay private logs succeed
if (ec == ERR_OK) {
hycdong marked this conversation as resolved.
Show resolved Hide resolved
ddebug_replica(
"replay private_log files succeed, file count={}, app last_committed_decree={}",
plog_files.size(),
_app->last_committed_decree());

int count = 0;
for (mutation_ptr &mu : mutation_list) {
decree d = mu->data.header.decree;
if (d <= plist.last_committed_decree()) {
continue;
}
mutation_ptr origin_mu = plist.get_mutation_by_decree(d);
if (origin_mu != nullptr && origin_mu->data.header.ballot >= mu->data.header.ballot) {
continue;
}
if (!mu->is_logged()) {
mu->set_logged();
}
plist.prepare(mu, partition_status::PS_SECONDARY);
++count;
}
plist.commit(last_committed_decree, COMMIT_TO_DECREE_HARD);
ddebug_replica(
"apply in-memory mutations succeed, mutation count={}, app last_committed_decree={}",
count,
_app->last_committed_decree());
}
return ec;
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica::child_catch_up_states() // on child partition
{
FAIL_POINT_INJECT_F("replica_child_catch_up_states", [](dsn::string_view) {});
// TODO(heyuchen): TBD

if (status() != partition_status::PS_PARTITION_SPLIT) {
derror_replica("wrong status = {}", status());
hycdong marked this conversation as resolved.
Show resolved Hide resolved
return;
}

decree goal_decree = _prepare_list->last_committed_decree();
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
decree local_decree = _app->last_committed_decree();

// there are still some mutations child not learn
hycdong marked this conversation as resolved.
Show resolved Hide resolved
if (local_decree < goal_decree) {
if (local_decree >=
_prepare_list->min_decree()) { // missing mutations are all in prepare_list
dwarn_replica("there are some in-memory mutations should be learned, app "
"last_committed_decree={}, "
"goal decree={}, prepare_list min_decree={}",
local_decree,
goal_decree,
_prepare_list->min_decree());
for (decree d = local_decree + 1; d <= goal_decree; ++d) {
auto mu = _prepare_list->get_mutation_by_decree(d);
dassert(mu != nullptr, "");
error_code ec = _app->apply_mutation(mu);
if (ec != ERR_OK) {
child_handle_split_error("child_catchup failed because apply mutation failed");
return;
}
}
} else { // some missing mutations are not in memory
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
dwarn_replica(
"there are some private logs should be learned, app last_committed_decree="
"{}, prepare_list min_decree={}, please wait",
local_decree,
_prepare_list->min_decree());
_split_states.async_learn_task = tasking::enqueue(
LPC_CATCHUP_WITH_PRIVATE_LOGS,
tracker(),
[this]() {
this->catch_up_with_private_logs(partition_status::PS_PARTITION_SPLIT);
hycdong marked this conversation as resolved.
Show resolved Hide resolved
_split_states.async_learn_task = nullptr;
},
get_gpid().thread_hash());
return;
}
}

ddebug_replica("child catch up parent states, goal decree={}, local decree={}",
_prepare_list->last_committed_decree(),
_app->last_committed_decree());
_split_states.is_caught_up = true;

child_notify_catch_up();
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica::child_notify_catch_up() // on child partition
{
FAIL_POINT_INJECT_F("replica_child_notify_catch_up", [](dsn::string_view) {});
}

// ThreadPool: THREAD_POOL_REPLICATION
Expand Down
4 changes: 4 additions & 0 deletions src/dist/replication/lib/replication_app_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <fstream>
#include <sstream>
#include <memory>
#include <dsn/utility/fail_point.h>

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -456,6 +457,9 @@ int replication_app_base::on_batched_write_requests(int64_t decree,

::dsn::error_code replication_app_base::apply_mutation(const mutation *mu)
{
FAIL_POINT_INJECT_F("replication_app_base_apply_mutation",
[](dsn::string_view) { return ERR_OK; });

dassert(mu->data.header.decree == last_committed_decree() + 1,
"invalid mutation decree, decree = %" PRId64 " VS %" PRId64 "",
mu->data.header.decree,
Expand Down
2 changes: 2 additions & 0 deletions src/dist/replication/test/replica_test/unit_test/mock_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ class mock_replica : public replica
void set_init_child_ballot(ballot b) { _child_init_ballot = b; }
void set_last_committed_decree(decree d) { _prepare_list->reset(d); }
prepare_list *get_plist() { return _prepare_list; }
void prepare_list_truncate(decree d) { _prepare_list->truncate(d); }
void prepare_list_commit_hard(decree d) { _prepare_list->commit(d, COMMIT_TO_DECREE_HARD); }

private:
decree _max_gced_decree{invalid_decree - 1};
Expand Down
Loading