Skip to content

Commit

Permalink
feat(dup_enhancement#7): implement prepare_dup function in replica …
Browse files Browse the repository at this point in the history
…side for meta `DS_PREPARE` status (apache#1053)
  • Loading branch information
foreverneverer committed Mar 29, 2022
1 parent 203bb54 commit 33f8fce
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 9 deletions.
15 changes: 13 additions & 2 deletions src/replica/duplication/replica_duplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,19 @@ replica_duplicator::replica_duplicator(const duplication_entry &ent, replica *r)
prepare_dup();
}

// todo(jiashuo1) wait detail implementation
void replica_duplicator::prepare_dup() {}
void replica_duplicator::prepare_dup()
{
ddebug_replica("start prepare checkpoint to catch up with latest durable decree: "
"start_point_decree({}) vs last_durable_decree({})",
_start_point_decree,
_replica->last_durable_decree());

tasking::enqueue(
LPC_REPLICATION_COMMON,
&_tracker,
[this]() { _replica->trigger_manual_emergency_checkpoint(_start_point_decree); },
get_gpid().thread_hash());
}

void replica_duplicator::start_dup_log()
{
Expand Down
1 change: 1 addition & 0 deletions src/replica/duplication/replica_duplicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ class replica_duplicator : public replica_base, public pipeline::base
uint64_t get_pending_mutations_count() const;

private:
friend class duplication_test_base;
friend class replica_duplicator_test;
friend class duplication_sync_timer_test;
friend class load_from_private_log_test;
Expand Down
14 changes: 12 additions & 2 deletions src/replica/duplication/test/duplication_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,18 @@ class duplication_test_base : public replica_test_base
return dup_entities[dupid].get();
}

std::unique_ptr<replica_duplicator> create_test_duplicator(decree confirmed = invalid_decree)
std::unique_ptr<replica_duplicator> create_test_duplicator(decree confirmed = invalid_decree,
decree start = invalid_decree)
{
duplication_entry dup_ent;
dup_ent.dupid = 1;
dup_ent.remote = "remote_address";
dup_ent.status = duplication_status::DS_PAUSE;
dup_ent.progress[_replica->get_gpid().get_partition_index()] = confirmed;
return make_unique<replica_duplicator>(dup_ent, _replica.get());

auto duplicator = make_unique<replica_duplicator>(dup_ent, _replica.get());
duplicator->_start_point_decree = start;
return duplicator;
}

std::map<int, log_file_ptr> open_log_file_map(const std::string &log_dir)
Expand All @@ -78,6 +82,12 @@ class duplication_test_base : public replica_test_base
mut->data.updates[0].code = RPC_DUPLICATION_IDEMPOTENT_WRITE; // must be idempotent write
return mut;
}

void wait_all(const std::unique_ptr<replica_duplicator> &dup)
{
dup->tracker()->wait_outstanding_tasks();
dup->_replica->tracker()->wait_outstanding_tasks();
}
};

} // namespace replication
Expand Down
24 changes: 23 additions & 1 deletion src/replica/duplication/test/replica_duplicator_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,20 @@ namespace replication {
class replica_duplicator_test : public duplication_test_base
{
public:
replica_duplicator_test() { _replica->init_private_log(_log_dir); }
replica_duplicator_test()
{
_replica->set_partition_status(partition_status::PS_PRIMARY);
_replica->init_private_log(_log_dir);
}

mock_replica *replica() { return _replica.get(); }

decree last_durable_decree() const { return _replica->last_durable_decree(); }

decree log_dup_start_decree(const std::unique_ptr<replica_duplicator> &dup) const
{
return dup->_start_point_decree;
}

void test_new_duplicator()
{
Expand Down Expand Up @@ -132,5 +145,14 @@ TEST_F(replica_duplicator_test, duplication_progress)
"last_decree(10) should always larger than confirmed_decree(12)"));
}

TEST_F(replica_duplicator_test, prapre_dup)
{
auto duplicator = create_test_duplicator(invalid_decree, 100);
replica()->update_expect_last_durable_decree(100);
duplicator->prepare_dup();
wait_all(duplicator);
ASSERT_EQ(last_durable_decree(), log_dup_start_decree(duplicator));
}

} // namespace replication
} // namespace dsn
3 changes: 3 additions & 0 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
//
// Duplication
//
error_code trigger_manual_emergency_checkpoint(decree old_decree);
replica_duplicator_manager *get_duplication_manager() const { return _duplication_mgr.get(); }
bool is_duplicating() const { return _duplicating; }

Expand Down Expand Up @@ -466,6 +467,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
friend class replica_duplicator_manager;
friend class load_mutation;
friend class replica_split_test;
friend class replica_test_base;
friend class replica_test;
friend class replica_backup_manager;
friend class replica_bulk_loader;
Expand Down Expand Up @@ -549,6 +551,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

// duplication
std::unique_ptr<replica_duplicator_manager> _duplication_mgr;
bool _is_manual_emergency_checkpointing{false};
bool _duplicating{false};

// backup
Expand Down
41 changes: 41 additions & 0 deletions src/replica/replica_chkpt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "replica_stub.h"
#include "duplication/replica_duplicator_manager.h"
#include "split/replica_split_manager.h"
#include "dsn/utility/fail_point.h"
#include <dsn/utility/filesystem.h>
#include <dsn/utility/chrono_literals.h>
#include <dsn/dist/replication/replication_app_base.h>
Expand Down Expand Up @@ -123,6 +124,46 @@ void replica::on_checkpoint_timer()
}
}

// ThreadPool: THREAD_POOL_REPLICATION
error_code replica::trigger_manual_emergency_checkpoint(decree old_decree)
{
_checker.only_one_thread_access();

if (_app == nullptr) {
derror_replica("app hasn't been init or has been released");
return ERR_LOCAL_APP_FAILURE;
}

if (old_decree <= _app->last_durable_decree()) {
ddebug_replica("checkpoint has been successful: old = {} vs latest = {}",
old_decree,
_app->last_durable_decree());
_is_manual_emergency_checkpointing = false;
_stub->_manual_emergency_checkpointing_count == 0
? 0
: (--_stub->_manual_emergency_checkpointing_count);
return ERR_OK;
}

if (_is_manual_emergency_checkpointing) {
dwarn_replica("replica is checkpointing, last_durable_decree = {}",
_app->last_durable_decree());
return ERR_BUSY;
}

if (++_stub->_manual_emergency_checkpointing_count >
FLAGS_max_concurrent_manual_emergency_checkpointing_count) {
dwarn_replica("please try again later because checkpointing exceed max running count[{}]",
FLAGS_max_concurrent_manual_emergency_checkpointing_count);
--_stub->_manual_emergency_checkpointing_count;
return ERR_TRY_AGAIN;
}

_is_manual_emergency_checkpointing = true;
init_checkpoint(true);
return ERR_OK;
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica::init_checkpoint(bool is_emergency)
{
Expand Down
7 changes: 7 additions & 0 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ DSN_DEFINE_bool("replication",
true,
"true means ignore broken data disk when initialize");

DSN_DEFINE_uint32("replication",
max_concurrent_manual_emergency_checkpointing_count,
1,
"max concurrent manual emergency checkpoint running count");
DSN_TAG_VARIABLE(max_concurrent_manual_emergency_checkpointing_count, FT_MUTABLE);

bool replica_stub::s_not_exit_on_log_failure = false;

replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
Expand All @@ -91,6 +97,7 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
_learn_app_concurrent_count(0),
_fs_manager(false),
_bulk_load_downloading_count(0),
_manual_emergency_checkpointing_count(0),
_is_running(false)
{
#ifdef DSN_ENABLE_GPERF
Expand Down
7 changes: 6 additions & 1 deletion src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
namespace dsn {
namespace replication {

DSN_DECLARE_uint32(max_concurrent_manual_emergency_checkpointing_count);

typedef rpc_holder<group_check_response, learn_notify_response> learn_completion_notification_rpc;
typedef rpc_holder<group_check_request, group_check_response> group_check_rpc;
typedef rpc_holder<query_replica_decree_request, query_replica_decree_response>
Expand Down Expand Up @@ -404,9 +406,12 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
// write body size exceed this threshold will be logged and reject, 0 means no check
uint64_t _max_allowed_write_size;

// replica count exectuting bulk load downloading concurrently
// replica count executing bulk load downloading concurrently
std::atomic_int _bulk_load_downloading_count;

// replica count executing emergency checkpoint concurrently
std::atomic_int _manual_emergency_checkpointing_count;

bool _is_running;

#ifdef DSN_ENABLE_GPERF
Expand Down
27 changes: 25 additions & 2 deletions src/replica/test/mock_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ class mock_replication_app_base : public replication_app_base
error_code start(int, char **) override { return ERR_NOT_IMPLEMENTED; }
error_code stop(bool) override { return ERR_NOT_IMPLEMENTED; }
error_code sync_checkpoint() override { return ERR_OK; }
error_code async_checkpoint(bool) override { return ERR_NOT_IMPLEMENTED; }
error_code async_checkpoint(bool) override
{
_last_durable_decree = _expect_last_durable_decree;
return ERR_OK;
}
error_code prepare_get_checkpoint(blob &) override { return ERR_NOT_IMPLEMENTED; }
error_code get_checkpoint(int64_t, const blob &, learn_state &) override
{
Expand All @@ -73,7 +77,7 @@ class mock_replication_app_base : public replication_app_base
// we mock the followings
void update_app_envs(const std::map<std::string, std::string> &envs) override { _envs = envs; }
void query_app_envs(std::map<std::string, std::string> &out) override { out = _envs; }
decree last_durable_decree() const override { return 0; }
decree last_durable_decree() const override { return _last_durable_decree; }

// TODO(heyuchen): implement this function in further pull request
void set_partition_version(int32_t partition_version) override {}
Expand All @@ -88,10 +92,16 @@ class mock_replication_app_base : public replication_app_base
return manual_compaction_status::IDLE;
}

void set_last_durable_decree(decree d) { _last_durable_decree = d; }

void set_expect_last_durable_decree(decree d) { _expect_last_durable_decree = d; }

private:
std::map<std::string, std::string> _envs;
decree _decree = 5;
ingestion_status::type _ingestion_status;
decree _last_durable_decree{0};
decree _expect_last_durable_decree{0};
};

class mock_replica : public replica
Expand All @@ -112,6 +122,7 @@ class mock_replica : public replica
~mock_replica() override
{
_config.status = partition_status::PS_INACTIVE;
_tracker.wait_outstanding_tasks();
_app.reset(nullptr);
}

Expand Down Expand Up @@ -194,8 +205,20 @@ class mock_replica : public replica
backup_context->complete_checkpoint();
}

void update_last_durable_decree(decree decree)
{
dynamic_cast<mock_replication_app_base *>(_app.get())->set_last_durable_decree(decree);
}

void update_expect_last_durable_decree(decree decree)
{
dynamic_cast<mock_replication_app_base *>(_app.get())
->set_expect_last_durable_decree(decree);
}

private:
decree _max_gced_decree{invalid_decree - 1};
decree _last_durable_decree{0};
};
typedef dsn::ref_ptr<mock_replica> mock_replica_ptr;

Expand Down
32 changes: 31 additions & 1 deletion src/replica/test/replica_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

#include <dsn/dist/replication/replica_envs.h>
#include <dsn/utility/defer.h>
#include <dsn/utility/fail_point.h>
#include <gtest/gtest.h>
#include "runtime/rpc/network.sim.h"

Expand Down Expand Up @@ -151,6 +150,13 @@ class replica_test : public replica_test_base
return _mock_replica->find_valid_checkpoint(req, remote_chkpt_dir);
}

void force_update_checkpointing(bool running)
{
_mock_replica->_is_manual_emergency_checkpointing = running;
}

bool is_checkpointing() { return _mock_replica->_is_manual_emergency_checkpointing; }

public:
dsn::app_info _app_info;
dsn::gpid pid;
Expand Down Expand Up @@ -318,5 +324,29 @@ TEST_F(replica_test, test_replica_backup_and_restore_with_specific_path)
ASSERT_EQ(ERR_OK, err);
}

TEST_F(replica_test, trigger_manual_emergency_checkpoint)
{
ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(100), ERR_OK);
ASSERT_TRUE(is_checkpointing());
_mock_replica->update_last_durable_decree(100);

// test no need start checkpoint because `old_decree` < `last_durable`
ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(100), ERR_OK);
ASSERT_FALSE(is_checkpointing());

// test has existed running task
force_update_checkpointing(true);
ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(101), ERR_BUSY);
ASSERT_TRUE(is_checkpointing());
force_update_checkpointing(false);

// test exceed max concurrent count
ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(101), ERR_OK);
force_update_checkpointing(false);
ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(101), ERR_TRY_AGAIN);
ASSERT_FALSE(is_checkpointing());
_mock_replica->tracker()->wait_outstanding_tasks();
}

} // namespace replication
} // namespace dsn

0 comments on commit 33f8fce

Please sign in to comment.