Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(dup): preserve data consistency during replica learn (#355)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored Dec 26, 2019
1 parent c3f2cd4 commit 7fc0b25
Show file tree
Hide file tree
Showing 9 changed files with 400 additions and 36 deletions.
8 changes: 8 additions & 0 deletions src/dist/replication/lib/mutation_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,14 @@ class mutation_log : public ref_counter
return replay_block(log, callback, start_offset, end_offset);
}

// Resets private-log with log files under `dir`.
// The original plog will be removed after this call.
// NOTE: private-log should be opened before this method called.
virtual error_code reset_from(const std::string &dir, io_failure_callback)
{
return ERR_NOT_IMPLEMENTED;
}

//
// maintain max_decree & valid_start_offset
//
Expand Down
15 changes: 15 additions & 0 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// routine for get extra envs from replica
const std::map<std::string, std::string> &get_replica_extra_envs() const { return _extra_envs; }

protected:
// this method is marked protected to enable us to mock it in unit tests.
virtual decree max_gced_decree_no_lock() const;

private:
// common helpers
void init_state();
Expand Down Expand Up @@ -234,6 +238,17 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
void notify_learn_completion();
error_code apply_learned_state_from_private_log(learn_state &state);

// Gets the position where this round of the learning process should begin.
// This method is called on primary-side.
// TODO(wutao1): mark it const
decree get_learn_start_decree(const learn_request &req);

// This method differs with `_private_log->max_gced_decree()` in that
// it also takes `learn/` dir into account, since the learned logs are
// a part of plog as well.
// This method is called on learner-side.
decree get_max_gced_decree_for_learn() const;

/////////////////////////////////////////////////////////////////
// failure handling
void handle_local_failure(error_code error);
Expand Down
1 change: 1 addition & 0 deletions src/dist/replication/lib/replica_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ bool potential_secondary_context::cleanup(bool force)
learn_app_concurrent_count_increased = false;
}
learning_start_prepare_decree = invalid_decree;
first_learn_start_decree = invalid_decree;
learning_status = learner_status::LearningInvalid;
return true;
}
Expand Down
20 changes: 8 additions & 12 deletions src/dist/replication/lib/replica_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@
* THE SOFTWARE.
*/

/*
* Description:
* What is this file about?
*
* Revision history:
* xxxx-xx-xx, author, first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#pragma once

#include <dsn/tool-api/zlocks.h>
Expand Down Expand Up @@ -130,7 +121,7 @@ class secondary_context
class potential_secondary_context
{
public:
potential_secondary_context(replica *r)
explicit potential_secondary_context(replica *r)
: owner_replica(r),
learning_version(0),
learning_start_ts_ns(0),
Expand Down Expand Up @@ -163,6 +154,10 @@ class potential_secondary_context
volatile bool learn_app_concurrent_count_increased;
decree learning_start_prepare_decree;

// The start decree in the first round of learn.
// It indicates the minimum decree under `learn/` dir.
decree first_learn_start_decree{invalid_decree};

::dsn::task_ptr delay_learning_task;
::dsn::task_ptr learning_task;
::dsn::task_ptr learn_remote_files_task;
Expand Down Expand Up @@ -425,6 +420,7 @@ class cold_backup_context : public ref_counter
uint64_t get_upload_file_size() { return _upload_file_size.load(); }

int64_t get_checkpoint_total_size() { return checkpoint_file_total_size; }

private:
void read_current_chkpt_file(const dist::block_service::block_file_ptr &file_handle);
void remote_chkpt_dir_exist(const std::string &chkpt_dirname);
Expand Down Expand Up @@ -545,5 +541,5 @@ inline partition_status::type primary_context::get_node_status(::dsn::rpc_addres
auto it = statuses.find(addr);
return it != statuses.end() ? it->second : partition_status::PS_INACTIVE;
}
}
} // end namespace
} // namespace replication
} // namespace dsn
Loading

0 comments on commit 7fc0b25

Please sign in to comment.