Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(dup): protect private log from missing when duplication is enabl…
Browse files Browse the repository at this point in the history
…ed (#320)
  • Loading branch information
Wu Tao authored Dec 3, 2019
1 parent 0de9e39 commit 503fdf6
Show file tree
Hide file tree
Showing 8 changed files with 363 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,28 @@ void replica_duplicator_manager::sync_duplication(const duplication_entry &ent)
}
}

decree replica_duplicator_manager::min_confirmed_decree() const
{
zauto_lock l(_lock);

decree min_decree = invalid_decree;
if (_replica->status() == partition_status::PS_PRIMARY) {
for (auto &kv : _duplications) {
const duplication_progress &p = kv.second->progress();
if (min_decree == invalid_decree) {
min_decree = p.confirmed_decree;
} else {
min_decree = std::min(min_decree, p.confirmed_decree);
}
}
} else if (_primary_confirmed_decree > 0) {
// if the replica is not primary, use the latest known (from primary)
// confirmed_decree instead.
min_decree = _primary_confirmed_decree;
}
return min_decree;
}

// Remove the duplications that are not in the `new_dup_map`.
// NOTE: this function may be blocked when destroying replica_duplicator.
void replica_duplicator_manager::remove_non_existed_duplications(
Expand All @@ -80,5 +102,23 @@ void replica_duplicator_manager::remove_non_existed_duplications(
}
}

void replica_duplicator_manager::update_confirmed_decree_if_secondary(decree confirmed)
{
// this function always runs in the same single thread with config-sync
if (_replica->status() != partition_status::PS_SECONDARY) {
return;
}

zauto_lock l(_lock);
remove_all_duplications();
if (confirmed >= 0) {
// confirmed decree never decreases
if (_primary_confirmed_decree < confirmed) {
_primary_confirmed_decree = confirmed;
}
}
_replica->update_init_info_duplicating(confirmed >= 0);
}

} // namespace replication
} // namespace dsn
13 changes: 13 additions & 0 deletions src/dist/replication/lib/duplication/replica_duplicator_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ class replica_duplicator_manager : public replica_base
/// collect updated duplication confirm points from this replica.
std::vector<duplication_confirm_entry> get_duplication_confirms_to_update() const;

/// mutations <= min_confirmed_decree are assumed to be cleanable.
/// If there's no duplication, invalid_decree is returned, mean that all logs are cleanable.
/// THREAD_POOL_REPLICATION
/// \see replica::on_checkpoint_timer()
decree min_confirmed_decree() const;

/// Updates the latest known confirmed decree on this replica if it's secondary.
/// THREAD_POOL_REPLICATION
/// \see replica_check.cpp
void update_confirmed_decree_if_secondary(decree confirmed);

private:
void sync_duplication(const duplication_entry &ent);

Expand All @@ -66,6 +77,8 @@ class replica_duplicator_manager : public replica_base

std::map<dupid_t, replica_duplicator_u_ptr> _duplications;

decree _primary_confirmed_decree{invalid_decree};

// avoid thread conflict between replica::on_checkpoint_timer and
// duplication_sync_timer.
mutable zlock _lock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,39 @@ class replica_duplicator_manager_test : public duplication_test_base
ASSERT_EQ(d._duplications.size(), 1);
}

void test_set_confirmed_decree_non_primary()
{
auto r = stub->add_primary_replica(2, 1);
auto &d = r->get_replica_duplicator_manager();

duplication_entry ent;
ent.dupid = 1;
ent.status = duplication_status::DS_PAUSE;
ent.remote = "dsn://slave-cluster";
ent.progress[r->get_gpid().get_partition_index()] = 100;
d.sync_duplication(ent);
ASSERT_EQ(d._duplications.size(), 1);
ASSERT_EQ(d._primary_confirmed_decree, invalid_decree);

// replica failover
r->as_secondary();

d.update_confirmed_decree_if_secondary(99);
ASSERT_EQ(d._duplications.size(), 0);
ASSERT_EQ(d._primary_confirmed_decree, 99);

// receives group check
d.update_confirmed_decree_if_secondary(101);
ASSERT_EQ(d._duplications.size(), 0);
ASSERT_EQ(d._primary_confirmed_decree, 101);

// confirmed decree never decreases
d.update_confirmed_decree_if_secondary(0);
ASSERT_EQ(d._primary_confirmed_decree, 101);
d.update_confirmed_decree_if_secondary(1);
ASSERT_EQ(d._primary_confirmed_decree, 101);
}

void test_get_duplication_confirms()
{
auto r = stub->add_primary_replica(2, 1);
Expand Down Expand Up @@ -68,17 +101,71 @@ class replica_duplicator_manager_test : public duplication_test_base
auto result = r->get_replica_duplicator_manager().get_duplication_confirms_to_update();
ASSERT_EQ(result.size(), update_dup_num);
}

void test_min_confirmed_decree()
{
struct test_case
{
std::vector<int64_t> confirmed_decree;
int64_t min_confirmed_decree;
};

auto r = stub->add_non_primary_replica(2, 1);
auto assert_test = [r, this](test_case tt) {
for (int id = 1; id <= tt.confirmed_decree.size(); id++) {
duplication_entry ent;
ent.dupid = id;
ent.status = duplication_status::DS_PAUSE;
ent.progress[r->get_gpid().get_partition_index()] = 0;

auto dup = make_unique<replica_duplicator>(ent, r);
dup->update_progress(dup->progress()
.set_last_decree(tt.confirmed_decree[id - 1])
.set_confirmed_decree(tt.confirmed_decree[id - 1]));
add_dup(r, std::move(dup));
}

ASSERT_EQ(r->get_replica_duplicator_manager().min_confirmed_decree(),
tt.min_confirmed_decree);
r->get_replica_duplicator_manager()._duplications.clear();
};

{
// non-primary
test_case tt{{1, 2, 3}, invalid_decree};
assert_test(tt);
}

{ // primary
r->as_primary();
test_case tt{{1, 2, 3}, 1};
assert_test(tt);

tt = {{1000}, 1000};
assert_test(tt);

tt = {{}, invalid_decree};
assert_test(tt);
}
}
};

TEST_F(replica_duplicator_manager_test, get_duplication_confirms)
{
test_get_duplication_confirms();
}

TEST_F(replica_duplicator_manager_test, set_confirmed_decree_non_primary)
{
test_set_confirmed_decree_non_primary();
}

TEST_F(replica_duplicator_manager_test, remove_non_existed_duplications)
{
test_remove_non_existed_duplications();
}

TEST_F(replica_duplicator_manager_test, min_confirmed_decree) { test_min_confirmed_decree(); }

} // namespace replication
} // namespace dsn
123 changes: 123 additions & 0 deletions src/dist/replication/lib/duplication/test/replica_duplicator_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include <dsn/utility/filesystem.h>

#include "dist/replication/lib/mutation_log_utils.h"
#include "dist/replication/lib/duplication/load_from_private_log.h"
#include "dist/replication/lib/duplication/duplication_pipeline.h"
#include "duplication_test_base.h"

namespace dsn {
namespace apps {

// for loading PUT mutations from log file.
DEFINE_TASK_CODE_RPC(RPC_RRDB_RRDB_PUT, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT);

} // namespace apps
} // namespace dsn

namespace dsn {
namespace replication {

class replica_duplicator_test : public duplication_test_base
{
public:
replica_duplicator_test() { _replica->init_private_log(_log_dir); }

void test_new_duplicator()
{
dupid_t dupid = 1;
std::string remote = "remote_address";
duplication_status::type status = duplication_status::DS_PAUSE;
int64_t confirmed_decree = 100;

duplication_entry dup_ent;
dup_ent.dupid = dupid;
dup_ent.remote = remote;
dup_ent.status = status;
dup_ent.progress[_replica->get_gpid().get_partition_index()] = confirmed_decree;

auto duplicator = make_unique<replica_duplicator>(dup_ent, _replica.get());
ASSERT_EQ(duplicator->id(), dupid);
ASSERT_EQ(duplicator->remote_cluster_name(), remote);
ASSERT_EQ(duplicator->_status, status);
ASSERT_EQ(duplicator->progress().confirmed_decree, confirmed_decree);
ASSERT_EQ(duplicator->progress().last_decree, confirmed_decree);

auto &expected_env = *duplicator;
ASSERT_EQ(duplicator->tracker(), expected_env.__conf.tracker);
ASSERT_EQ(duplicator->get_gpid().thread_hash(), expected_env.__conf.thread_hash);
}

void test_pause_start_duplication()
{
mutation_log_ptr mlog = new mutation_log_private(
_replica->dir(), 4, _replica->get_gpid(), _replica.get(), 1024, 512, 10000);
EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK);

{
_replica->init_private_log(mlog);
auto duplicator = create_test_duplicator();

duplicator->update_status_if_needed(duplication_status::DS_START);
ASSERT_EQ(duplicator->_status, duplication_status::DS_START);
auto expected_env = duplicator->_ship->_mutation_duplicator->_env;
ASSERT_EQ(duplicator->tracker(), expected_env.__conf.tracker);
ASSERT_EQ(duplicator->get_gpid().thread_hash(), expected_env.__conf.thread_hash);

// corner cases: next_status is INIT
duplicator->update_status_if_needed(duplication_status::DS_INIT);
ASSERT_EQ(duplicator->_status, duplication_status::DS_START);
duplicator->update_status_if_needed(duplication_status::DS_START);
ASSERT_EQ(duplicator->_status, duplication_status::DS_START);

duplicator->update_status_if_needed(duplication_status::DS_PAUSE);
ASSERT_TRUE(duplicator->paused());
ASSERT_EQ(duplicator->_status, duplication_status::DS_PAUSE);
ASSERT_EQ(duplicator->_load_private.get(), nullptr);
ASSERT_EQ(duplicator->_load.get(), nullptr);
ASSERT_EQ(duplicator->_ship.get(), nullptr);

// corner cases: next_status is INIT
duplicator->update_status_if_needed(duplication_status::DS_INIT);
ASSERT_EQ(duplicator->_status, duplication_status::DS_PAUSE);

// corner cases: next_status is INIT
duplicator->update_status_if_needed(duplication_status::DS_INIT);
ASSERT_EQ(duplicator->_status, duplication_status::DS_PAUSE);

duplicator->wait_all();
}
}
};

TEST_F(replica_duplicator_test, new_duplicator) { test_new_duplicator(); }

TEST_F(replica_duplicator_test, pause_start_duplication) { test_pause_start_duplication(); }

TEST_F(replica_duplicator_test, duplication_progress)
{
auto duplicator = create_test_duplicator();
ASSERT_EQ(duplicator->progress().last_decree, 0); // start duplication from empty plog
ASSERT_EQ(duplicator->progress().confirmed_decree, invalid_decree);

duplicator->update_progress(duplicator->progress().set_last_decree(10));
ASSERT_EQ(duplicator->progress().last_decree, 10);
ASSERT_EQ(duplicator->progress().confirmed_decree, invalid_decree);

duplicator->update_progress(duplicator->progress().set_confirmed_decree(10));
ASSERT_EQ(duplicator->progress().confirmed_decree, 10);
ASSERT_EQ(duplicator->progress().last_decree, 10);

ASSERT_EQ(duplicator->update_progress(duplicator->progress().set_confirmed_decree(1)),
error_s::make(ERR_INVALID_STATE, "never decrease confirmed_decree: new(1) old(10)"));

ASSERT_EQ(duplicator->update_progress(duplicator->progress().set_confirmed_decree(12)),
error_s::make(ERR_INVALID_STATE,
"last_decree(10) should always larger than confirmed_decree(12)"));
}

} // namespace replication
} // namespace dsn
3 changes: 3 additions & 0 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,10 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
bool verbose_commit_log() const;
dsn::task_tracker *tracker() { return &_tracker; }

/// \see replica_duplicate.cpp
replica_duplicator_manager *get_duplication_manager() const { return _duplication_mgr.get(); }
bool is_duplicating() const;
void update_init_info_duplicating(bool duplicating);

void update_last_checkpoint_generate_time();

Expand Down
22 changes: 15 additions & 7 deletions src/dist/replication/lib/replica_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
#include "mutation.h"
#include "mutation_log.h"
#include "replica_stub.h"

#include "dist/replication/lib/duplication/replica_duplicator_manager.h"

#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication_app_base.h>

namespace dsn {
Expand Down Expand Up @@ -91,6 +95,7 @@ void replica::broadcast_group_check()
request->node = addr;
_primary_states.get_replica_config(it->second, request->config);
request->last_committed_decree = last_committed_decree();
request->__set_confirmed_decree(_duplication_mgr->min_confirmed_decree());

if (request->config.status == partition_status::PS_POTENTIAL_SECONDARY) {
auto it = _primary_states.learners.find(addr);
Expand Down Expand Up @@ -133,13 +138,13 @@ void replica::on_group_check(const group_check_request &request,
{
_checker.only_one_thread_access();

ddebug("%s: process group check, primary = %s, ballot = %" PRId64
", status = %s, last_committed_decree = %" PRId64,
name(),
request.config.primary.to_string(),
request.config.ballot,
enum_to_string(request.config.status),
request.last_committed_decree);
ddebug_replica("process group check, primary = {}, ballot = {}, status = {}, "
"last_committed_decree = {}, confirmed_decree = {}",
request.config.primary.to_string(),
request.config.ballot,
enum_to_string(request.config.status),
request.last_committed_decree,
request.__isset.confirmed_decree ? request.confirmed_decree : invalid_decree);

if (request.config.ballot < get_ballot()) {
response.err = ERR_VERSION_OUTDATED;
Expand All @@ -154,6 +159,9 @@ void replica::on_group_check(const group_check_request &request,
} else if (is_same_ballot_status_change_allowed(status(), request.config.status)) {
update_local_configuration(request.config, true);
}
if (request.__isset.confirmed_decree) {
_duplication_mgr->update_confirmed_decree_if_secondary(request.confirmed_decree);
}

switch (status()) {
case partition_status::PS_INACTIVE:
Expand Down
Loading

0 comments on commit 503fdf6

Please sign in to comment.