Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

fix(duplication): implement reset_from to fix plog loss when duplicating with learning #845

Merged
merged 30 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions src/replica/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <dsn/utils/latency_tracer.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/crc.h>
#include <dsn/utility/defer.h>
#include <dsn/utility/fail_point.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/tool-api/async_calls.h>
Expand Down Expand Up @@ -916,6 +917,65 @@ int64_t mutation_log::total_size_no_lock() const
return _log_files.size() > 0 ? _global_end_offset - _global_start_offset : 0;
}

error_code mutation_log::reset_from(const std::string &dir,
replay_callback replay_error_callback,
io_failure_callback write_error_callback)
{
error_code err = ERR_FILE_OPERATION_FAILED;

// close for flushing current log and be ready to open new log files after reset
close();

// make sure logs in `dir` (such as /learn) are valid.
error_s es = log_utils::check_log_files_continuity(dir);
if (!es.is_ok()) {
derror_f("the log of source dir {} is invalid:{}, will remove it.", dir, es);
if (!utils::filesystem::remove_path(dir)) {
derror_f("remove {} failed", dir);
return err;
}
return es.code();
}

std::string temp_dir = _dir + '.' + std::to_string(dsn_now_ns());
if (!utils::filesystem::rename_path(_dir, temp_dir)) {
derror_f("rename {} to {} failed", _dir, temp_dir);
return err;
}
ddebug_f("moved current log dir {} to tmp_dir {}", _dir, temp_dir);
// define `defer` for rollback temp_dir when failed or remove temp_dir when success
auto temp_dir_resolve = dsn::defer([this, err, temp_dir]() {
if (err != ERR_OK) {
if (!utils::filesystem::rename_path(temp_dir, _dir)) {
// rollback failed means old log files are not be recovered, it may be lost if only
// derror, dassert for manual resolve it
dassert_f("rollback {} to {} failed", temp_dir, _dir);
}
} else {
if (!dsn::utils::filesystem::remove_path(temp_dir)) {
// temp dir allow delete failed, it's only garbage
derror_f("remove temp dir {} failed", temp_dir);
}
}
});

// move source dir to target dir
if (!utils::filesystem::rename_path(dir, _dir)) {
derror_f("rename {} to {} failed", dir, _dir);
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
return err;
}
ddebug_f("move {} to {} as our new log directory", dir, _dir);

// - make sure logs in moved dir(such as /plog) are valid and can be opened successfully.
// - re-open new log files for loading the new log file and register the files into replica,
// please make sure the old log files has been closed
err = open(replay_error_callback, write_error_callback);
if (err != ERR_OK) {
derror_f("the logs of moved dir {} are invalid and open failed:{}", _dir, err);
}
return err;
}

void mutation_log::set_valid_start_offset_on_open(gpid gpid, int64_t valid_start_offset)
{
zauto_lock l(_lock);
Expand Down
11 changes: 4 additions & 7 deletions src/replica/mutation_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,10 @@ class mutation_log : public ref_counter
return replay_block(log, callback, start_offset, end_offset);
}

// Resets private-log with log files under `dir`.
// The original plog will be removed after this call.
// NOTE: private-log should be opened before this method called.
virtual error_code reset_from(const std::string &dir, io_failure_callback)
{
return ERR_NOT_IMPLEMENTED;
}
// Resets mutation log with log files under `dir`.
// The original log will be removed after this call.
// NOTE: log should be opened before this method called. now it only be used private log
error_code reset_from(const std::string &dir, replay_callback, io_failure_callback);

//
// maintain max_decree & valid_start_offset
Expand Down
138 changes: 76 additions & 62 deletions src/replica/replica_learn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,24 +568,25 @@ void replica::on_learn_reply(error_code err, learn_request &&req, learn_response
return;
}

ddebug("%s: on_learn_reply[%016" PRIx64 "]: learnee = %s, learn_duration = %" PRIu64
" ms, response_err = %s, remote_committed_decree = %" PRId64 ", "
"prepare_start_decree = %" PRId64 ", learn_type = %s, learned_buffer_size = %u, "
"learned_file_count = %u, to_decree_included = %" PRId64
", learn_start_decree = %" PRId64 ", current_learning_status = %s",
name(),
req.signature,
resp.config.primary.to_string(),
_potential_secondary_states.duration_ms(),
resp.err.to_string(),
resp.last_committed_decree,
resp.prepare_start_decree,
enum_to_string(resp.type),
resp.state.meta.length(),
static_cast<uint32_t>(resp.state.files.size()),
resp.state.to_decree_included,
resp.state.learn_start_decree,
enum_to_string(_potential_secondary_states.learning_status));
ddebug_replica(
"on_learn_reply_start[{}]: learnee = {}, learn_duration ={} ms, response_err = "
"{}, remote_committed_decree = {}, prepare_start_decree = {}, learn_type = {} "
"learned_buffer_size = {}, learned_file_count = {},to_decree_included = "
"{}, learn_start_decree = {}, last_commit_decree = {}, current_learning_status = "
"{} ",
req.signature,
resp.config.primary.to_string(),
_potential_secondary_states.duration_ms(),
resp.err.to_string(),
resp.last_committed_decree,
resp.prepare_start_decree,
enum_to_string(resp.type),
resp.state.meta.length(),
static_cast<uint32_t>(resp.state.files.size()),
resp.state.to_decree_included,
resp.state.learn_start_decree,
_app->last_committed_decree(),
enum_to_string(_potential_secondary_states.learning_status));

_potential_secondary_states.learning_copy_buffer_size += resp.state.meta.length();
_stub->_counter_replicas_learning_recent_copy_buffer_size->add(resp.state.meta.length());
Expand Down Expand Up @@ -1507,7 +1508,14 @@ void replica::on_add_learner(const group_check_request &request)
error_code replica::apply_learned_state_from_private_log(learn_state &state)
{
bool duplicating = is_duplicating();

// if no dunplicate, learn_start_decree=last_commit decree, step_back means whether
// `learn_start_decree`should be stepped back to include all the
// unconfirmed when duplicating in this round of learn. default is false
bool step_back = false;

// in this case, this means `learn_start_decree` must have been stepped back to include all the
// unconfirmed(learn_start_decree=last_confirmed_decree) when duplicating in this round of
// learn.
// confirmed gced committed
// | | |
// learner's plog: ============[-----log------]
Expand All @@ -1518,28 +1526,33 @@ error_code replica::apply_learned_state_from_private_log(learn_state &state)
// ==> learn_start_decree |
// learner's plog | committed
// after applied: [---------------log----------------]

if (duplicating && state.__isset.learn_start_decree &&
state.learn_start_decree < _app->last_committed_decree() + 1) {
// it means this round of learn must have been stepped back
// to include all the unconfirmed.

// move the `learn/` dir to working dir (`plog/`).
error_code err = _private_log->reset_from(_app->learn_dir(), [this](error_code err) {
tasking::enqueue(LPC_REPLICATION_ERROR,
&_tracker,
[this, err]() { handle_local_failure(err); },
get_gpid().thread_hash());
});
ddebug_replica("learn_start_decree({}) < _app->last_committed_decree() + 1({}), learn "
"must stepped back to include all the unconfirmed ",
state.learn_start_decree,
_app->last_committed_decree() + 1);

// move the `learn/` dir to working dir (`plog/`) to replace current log files to replay
error_code err = _private_log->reset_from(
_app->learn_dir(),
[](int log_length, mutation_ptr &mu) { return true; },
[this](error_code err) {
tasking::enqueue(LPC_REPLICATION_ERROR,
&_tracker,
[this, err]() { handle_local_failure(err); },
get_gpid().thread_hash());
});
if (err != ERR_OK) {
derror_replica("failed to reset this private log with logs in learn/ dir: {}", err);
return err;
}

// only the uncommitted logs will be replayed and applied into storage.
// only select uncommitted logs to be replayed and applied into storage.
learn_state tmp_state;
_private_log->get_learn_state(get_gpid(), _app->last_committed_decree() + 1, tmp_state);
state.files = tmp_state.files;
step_back = true;
}

int64_t offset;
Expand All @@ -1549,13 +1562,15 @@ error_code replica::apply_learned_state_from_private_log(learn_state &state)
prepare_list plist(this,
_app->last_committed_decree(),
_options->max_mutation_count_in_prepare_list,
[this, duplicating](mutation_ptr &mu) {
[this, duplicating, step_back](mutation_ptr &mu) {
if (mu->data.header.decree == _app->last_committed_decree() + 1) {
// TODO: assign the returned error_code to err and check it
_app->apply_mutation(mu);

// appends logs-in-cache into plog to ensure them can be duplicated.
if (duplicating) {
// if current case is step back, it means the logs has been reserved
// through `reset_form` above
if (duplicating && !step_back) {
_private_log->append(
mu, LPC_WRITE_REPLICATION_LOG_COMMON, &_tracker, nullptr);
}
Expand Down Expand Up @@ -1598,18 +1613,19 @@ error_code replica::apply_learned_state_from_private_log(learn_state &state)
_potential_secondary_states.first_learn_start_decree = state.learn_start_decree;
}

ddebug("%s: apply_learned_state_from_private_log[%016" PRIx64 "]: learnee = %s, "
"learn_duration = %" PRIu64 " ms, apply private log files done, "
"file_count = %d, first_learn_start_decree = %" PRId64 ", learn_start_decree = %" PRId64
", app_committed_decree = %" PRId64,
name(),
_potential_secondary_states.learning_version,
_config.primary.to_string(),
_potential_secondary_states.duration_ms(),
static_cast<int>(state.files.size()),
_potential_secondary_states.first_learn_start_decree,
state.learn_start_decree,
_app->last_committed_decree());
ddebug_replica("apply_learned_state_from_private_log[{}]: duplicating={}, step_back={}, "
"learnee = {}, learn_duration = {} ms, apply private log files done, file_count "
"={}, first_learn_start_decree ={}, learn_start_decree = {}, "
"app_committed_decree = {}",
_potential_secondary_states.learning_version,
duplicating,
step_back,
_config.primary.to_string(),
_potential_secondary_states.duration_ms(),
state.files.size(),
_potential_secondary_states.first_learn_start_decree,
state.learn_start_decree,
_app->last_committed_decree());

// apply in-buffer private logs
if (err == ERR_OK) {
Expand All @@ -1631,26 +1647,24 @@ error_code replica::apply_learned_state_from_private_log(learn_state &state)
}

if (state.to_decree_included > last_committed_decree()) {
ddebug("%s: apply_learned_state_from_private_log[%016" PRIx64 "]: learnee = %s, "
"learned_to_decree_included(%" PRId64 ") > last_committed_decree(%" PRId64 "), "
"commit to to_decree_included",
name(),
_potential_secondary_states.learning_version,
_config.primary.to_string(),
state.to_decree_included,
last_committed_decree());
ddebug_replica("apply_learned_state_from_private_log[{}]: learnee ={}, "
"learned_to_decree_included({}) > last_committed_decree({}), commit to "
"to_decree_included",
_potential_secondary_states.learning_version,
_config.primary.to_string(),
state.to_decree_included,
last_committed_decree());
plist.commit(state.to_decree_included, COMMIT_TO_DECREE_SOFT);
}

ddebug("%s: apply_learned_state_from_private_log[%016" PRIx64 "]: learnee = %s, "
"learn_duration = %" PRIu64 " ms, apply in-buffer private logs done, "
"replay_count = %d, app_committed_decree = %" PRId64,
name(),
_potential_secondary_states.learning_version,
_config.primary.to_string(),
_potential_secondary_states.duration_ms(),
replay_count,
_app->last_committed_decree());
ddebug_replica(" apply_learned_state_from_private_log[{}]: learnee ={}, "
"learn_duration ={} ms, apply in-buffer private logs done, "
"replay_count ={}, app_committed_decree = {}",
_potential_secondary_states.learning_version,
_config.primary.to_string(),
_potential_secondary_states.duration_ms(),
replay_count,
_app->last_committed_decree());
}

// awaits for unfinished mutation writes.
Expand Down
Loading