diff --git a/src/dist/replication/lib/duplication/replica_duplicator_manager.cpp b/src/dist/replication/lib/duplication/replica_duplicator_manager.cpp index 7df2d13ca7..27c3138280 100644 --- a/src/dist/replication/lib/duplication/replica_duplicator_manager.cpp +++ b/src/dist/replication/lib/duplication/replica_duplicator_manager.cpp @@ -60,6 +60,28 @@ void replica_duplicator_manager::sync_duplication(const duplication_entry &ent) } } +decree replica_duplicator_manager::min_confirmed_decree() const +{ + zauto_lock l(_lock); + + decree min_decree = invalid_decree; + if (_replica->status() == partition_status::PS_PRIMARY) { + for (auto &kv : _duplications) { + const duplication_progress &p = kv.second->progress(); + if (min_decree == invalid_decree) { + min_decree = p.confirmed_decree; + } else { + min_decree = std::min(min_decree, p.confirmed_decree); + } + } + } else if (_primary_confirmed_decree > 0) { + // if the replica is not primary, use the latest known (from primary) + // confirmed_decree instead. + min_decree = _primary_confirmed_decree; + } + return min_decree; +} + // Remove the duplications that are not in the `new_dup_map`. // NOTE: this function may be blocked when destroying replica_duplicator. void replica_duplicator_manager::remove_non_existed_duplications( @@ -80,5 +102,23 @@ void replica_duplicator_manager::remove_non_existed_duplications( } } +void replica_duplicator_manager::update_confirmed_decree_if_secondary(decree confirmed) +{ + // this function always runs in the same single thread with config-sync + if (_replica->status() != partition_status::PS_SECONDARY) { + return; + } + + zauto_lock l(_lock); + remove_all_duplications(); + if (confirmed >= 0) { + // confirmed decree never decreases + if (_primary_confirmed_decree < confirmed) { + _primary_confirmed_decree = confirmed; + } + } + _replica->update_init_info_duplicating(confirmed >= 0); +} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/lib/duplication/replica_duplicator_manager.h b/src/dist/replication/lib/duplication/replica_duplicator_manager.h index 5a82e011f3..28e06b808c 100644 --- a/src/dist/replication/lib/duplication/replica_duplicator_manager.h +++ b/src/dist/replication/lib/duplication/replica_duplicator_manager.h @@ -43,6 +43,17 @@ class replica_duplicator_manager : public replica_base /// collect updated duplication confirm points from this replica. std::vector get_duplication_confirms_to_update() const; + /// mutations <= min_confirmed_decree are assumed to be cleanable. + /// If there's no duplication, invalid_decree is returned, mean that all logs are cleanable. + /// THREAD_POOL_REPLICATION + /// \see replica::on_checkpoint_timer() + decree min_confirmed_decree() const; + + /// Updates the latest known confirmed decree on this replica if it's secondary. + /// THREAD_POOL_REPLICATION + /// \see replica_check.cpp + void update_confirmed_decree_if_secondary(decree confirmed); + private: void sync_duplication(const duplication_entry &ent); @@ -66,6 +77,8 @@ class replica_duplicator_manager : public replica_base std::map _duplications; + decree _primary_confirmed_decree{invalid_decree}; + // avoid thread conflict between replica::on_checkpoint_timer and // duplication_sync_timer. mutable zlock _lock; diff --git a/src/dist/replication/lib/duplication/test/replica_duplicator_manager_test.cpp b/src/dist/replication/lib/duplication/test/replica_duplicator_manager_test.cpp index 79616014d9..e07e5563b5 100644 --- a/src/dist/replication/lib/duplication/test/replica_duplicator_manager_test.cpp +++ b/src/dist/replication/lib/duplication/test/replica_duplicator_manager_test.cpp @@ -36,6 +36,39 @@ class replica_duplicator_manager_test : public duplication_test_base ASSERT_EQ(d._duplications.size(), 1); } + void test_set_confirmed_decree_non_primary() + { + auto r = stub->add_primary_replica(2, 1); + auto &d = r->get_replica_duplicator_manager(); + + duplication_entry ent; + ent.dupid = 1; + ent.status = duplication_status::DS_PAUSE; + ent.remote = "dsn://slave-cluster"; + ent.progress[r->get_gpid().get_partition_index()] = 100; + d.sync_duplication(ent); + ASSERT_EQ(d._duplications.size(), 1); + ASSERT_EQ(d._primary_confirmed_decree, invalid_decree); + + // replica failover + r->as_secondary(); + + d.update_confirmed_decree_if_secondary(99); + ASSERT_EQ(d._duplications.size(), 0); + ASSERT_EQ(d._primary_confirmed_decree, 99); + + // receives group check + d.update_confirmed_decree_if_secondary(101); + ASSERT_EQ(d._duplications.size(), 0); + ASSERT_EQ(d._primary_confirmed_decree, 101); + + // confirmed decree never decreases + d.update_confirmed_decree_if_secondary(0); + ASSERT_EQ(d._primary_confirmed_decree, 101); + d.update_confirmed_decree_if_secondary(1); + ASSERT_EQ(d._primary_confirmed_decree, 101); + } + void test_get_duplication_confirms() { auto r = stub->add_primary_replica(2, 1); @@ -68,6 +101,53 @@ class replica_duplicator_manager_test : public duplication_test_base auto result = r->get_replica_duplicator_manager().get_duplication_confirms_to_update(); ASSERT_EQ(result.size(), update_dup_num); } + + void test_min_confirmed_decree() + { + struct test_case + { + std::vector confirmed_decree; + int64_t min_confirmed_decree; + }; + + auto r = stub->add_non_primary_replica(2, 1); + auto assert_test = [r, this](test_case tt) { + for (int id = 1; id <= tt.confirmed_decree.size(); id++) { + duplication_entry ent; + ent.dupid = id; + ent.status = duplication_status::DS_PAUSE; + ent.progress[r->get_gpid().get_partition_index()] = 0; + + auto dup = make_unique(ent, r); + dup->update_progress(dup->progress() + .set_last_decree(tt.confirmed_decree[id - 1]) + .set_confirmed_decree(tt.confirmed_decree[id - 1])); + add_dup(r, std::move(dup)); + } + + ASSERT_EQ(r->get_replica_duplicator_manager().min_confirmed_decree(), + tt.min_confirmed_decree); + r->get_replica_duplicator_manager()._duplications.clear(); + }; + + { + // non-primary + test_case tt{{1, 2, 3}, invalid_decree}; + assert_test(tt); + } + + { // primary + r->as_primary(); + test_case tt{{1, 2, 3}, 1}; + assert_test(tt); + + tt = {{1000}, 1000}; + assert_test(tt); + + tt = {{}, invalid_decree}; + assert_test(tt); + } + } }; TEST_F(replica_duplicator_manager_test, get_duplication_confirms) @@ -75,10 +155,17 @@ TEST_F(replica_duplicator_manager_test, get_duplication_confirms) test_get_duplication_confirms(); } +TEST_F(replica_duplicator_manager_test, set_confirmed_decree_non_primary) +{ + test_set_confirmed_decree_non_primary(); +} + TEST_F(replica_duplicator_manager_test, remove_non_existed_duplications) { test_remove_non_existed_duplications(); } +TEST_F(replica_duplicator_manager_test, min_confirmed_decree) { test_min_confirmed_decree(); } + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/lib/duplication/test/replica_duplicator_test.cpp b/src/dist/replication/lib/duplication/test/replica_duplicator_test.cpp new file mode 100644 index 0000000000..eb077a60fc --- /dev/null +++ b/src/dist/replication/lib/duplication/test/replica_duplicator_test.cpp @@ -0,0 +1,123 @@ +// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include + +#include "dist/replication/lib/mutation_log_utils.h" +#include "dist/replication/lib/duplication/load_from_private_log.h" +#include "dist/replication/lib/duplication/duplication_pipeline.h" +#include "duplication_test_base.h" + +namespace dsn { +namespace apps { + +// for loading PUT mutations from log file. +DEFINE_TASK_CODE_RPC(RPC_RRDB_RRDB_PUT, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT); + +} // namespace apps +} // namespace dsn + +namespace dsn { +namespace replication { + +class replica_duplicator_test : public duplication_test_base +{ +public: + replica_duplicator_test() { _replica->init_private_log(_log_dir); } + + void test_new_duplicator() + { + dupid_t dupid = 1; + std::string remote = "remote_address"; + duplication_status::type status = duplication_status::DS_PAUSE; + int64_t confirmed_decree = 100; + + duplication_entry dup_ent; + dup_ent.dupid = dupid; + dup_ent.remote = remote; + dup_ent.status = status; + dup_ent.progress[_replica->get_gpid().get_partition_index()] = confirmed_decree; + + auto duplicator = make_unique(dup_ent, _replica.get()); + ASSERT_EQ(duplicator->id(), dupid); + ASSERT_EQ(duplicator->remote_cluster_name(), remote); + ASSERT_EQ(duplicator->_status, status); + ASSERT_EQ(duplicator->progress().confirmed_decree, confirmed_decree); + ASSERT_EQ(duplicator->progress().last_decree, confirmed_decree); + + auto &expected_env = *duplicator; + ASSERT_EQ(duplicator->tracker(), expected_env.__conf.tracker); + ASSERT_EQ(duplicator->get_gpid().thread_hash(), expected_env.__conf.thread_hash); + } + + void test_pause_start_duplication() + { + mutation_log_ptr mlog = new mutation_log_private( + _replica->dir(), 4, _replica->get_gpid(), _replica.get(), 1024, 512, 10000); + EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK); + + { + _replica->init_private_log(mlog); + auto duplicator = create_test_duplicator(); + + duplicator->update_status_if_needed(duplication_status::DS_START); + ASSERT_EQ(duplicator->_status, duplication_status::DS_START); + auto expected_env = duplicator->_ship->_mutation_duplicator->_env; + ASSERT_EQ(duplicator->tracker(), expected_env.__conf.tracker); + ASSERT_EQ(duplicator->get_gpid().thread_hash(), expected_env.__conf.thread_hash); + + // corner cases: next_status is INIT + duplicator->update_status_if_needed(duplication_status::DS_INIT); + ASSERT_EQ(duplicator->_status, duplication_status::DS_START); + duplicator->update_status_if_needed(duplication_status::DS_START); + ASSERT_EQ(duplicator->_status, duplication_status::DS_START); + + duplicator->update_status_if_needed(duplication_status::DS_PAUSE); + ASSERT_TRUE(duplicator->paused()); + ASSERT_EQ(duplicator->_status, duplication_status::DS_PAUSE); + ASSERT_EQ(duplicator->_load_private.get(), nullptr); + ASSERT_EQ(duplicator->_load.get(), nullptr); + ASSERT_EQ(duplicator->_ship.get(), nullptr); + + // corner cases: next_status is INIT + duplicator->update_status_if_needed(duplication_status::DS_INIT); + ASSERT_EQ(duplicator->_status, duplication_status::DS_PAUSE); + + // corner cases: next_status is INIT + duplicator->update_status_if_needed(duplication_status::DS_INIT); + ASSERT_EQ(duplicator->_status, duplication_status::DS_PAUSE); + + duplicator->wait_all(); + } + } +}; + +TEST_F(replica_duplicator_test, new_duplicator) { test_new_duplicator(); } + +TEST_F(replica_duplicator_test, pause_start_duplication) { test_pause_start_duplication(); } + +TEST_F(replica_duplicator_test, duplication_progress) +{ + auto duplicator = create_test_duplicator(); + ASSERT_EQ(duplicator->progress().last_decree, 0); // start duplication from empty plog + ASSERT_EQ(duplicator->progress().confirmed_decree, invalid_decree); + + duplicator->update_progress(duplicator->progress().set_last_decree(10)); + ASSERT_EQ(duplicator->progress().last_decree, 10); + ASSERT_EQ(duplicator->progress().confirmed_decree, invalid_decree); + + duplicator->update_progress(duplicator->progress().set_confirmed_decree(10)); + ASSERT_EQ(duplicator->progress().confirmed_decree, 10); + ASSERT_EQ(duplicator->progress().last_decree, 10); + + ASSERT_EQ(duplicator->update_progress(duplicator->progress().set_confirmed_decree(1)), + error_s::make(ERR_INVALID_STATE, "never decrease confirmed_decree: new(1) old(10)")); + + ASSERT_EQ(duplicator->update_progress(duplicator->progress().set_confirmed_decree(12)), + error_s::make(ERR_INVALID_STATE, + "last_decree(10) should always larger than confirmed_decree(12)")); +} + +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index 665f4bdf01..ca611b96c2 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -170,7 +170,10 @@ class replica : public serverlet, public ref_counter, public replica_ba bool verbose_commit_log() const; dsn::task_tracker *tracker() { return &_tracker; } + /// \see replica_duplicate.cpp replica_duplicator_manager *get_duplication_manager() const { return _duplication_mgr.get(); } + bool is_duplicating() const; + void update_init_info_duplicating(bool duplicating); void update_last_checkpoint_generate_time(); diff --git a/src/dist/replication/lib/replica_check.cpp b/src/dist/replication/lib/replica_check.cpp index d5bdc9ec56..aac8363403 100644 --- a/src/dist/replication/lib/replica_check.cpp +++ b/src/dist/replication/lib/replica_check.cpp @@ -37,6 +37,10 @@ #include "mutation.h" #include "mutation_log.h" #include "replica_stub.h" + +#include "dist/replication/lib/duplication/replica_duplicator_manager.h" + +#include #include namespace dsn { @@ -91,6 +95,7 @@ void replica::broadcast_group_check() request->node = addr; _primary_states.get_replica_config(it->second, request->config); request->last_committed_decree = last_committed_decree(); + request->__set_confirmed_decree(_duplication_mgr->min_confirmed_decree()); if (request->config.status == partition_status::PS_POTENTIAL_SECONDARY) { auto it = _primary_states.learners.find(addr); @@ -133,13 +138,13 @@ void replica::on_group_check(const group_check_request &request, { _checker.only_one_thread_access(); - ddebug("%s: process group check, primary = %s, ballot = %" PRId64 - ", status = %s, last_committed_decree = %" PRId64, - name(), - request.config.primary.to_string(), - request.config.ballot, - enum_to_string(request.config.status), - request.last_committed_decree); + ddebug_replica("process group check, primary = {}, ballot = {}, status = {}, " + "last_committed_decree = {}, confirmed_decree = {}", + request.config.primary.to_string(), + request.config.ballot, + enum_to_string(request.config.status), + request.last_committed_decree, + request.__isset.confirmed_decree ? request.confirmed_decree : invalid_decree); if (request.config.ballot < get_ballot()) { response.err = ERR_VERSION_OUTDATED; @@ -154,6 +159,9 @@ void replica::on_group_check(const group_check_request &request, } else if (is_same_ballot_status_change_allowed(status(), request.config.status)) { update_local_configuration(request.config, true); } + if (request.__isset.confirmed_decree) { + _duplication_mgr->update_confirmed_decree_if_secondary(request.confirmed_decree); + } switch (status()) { case partition_status::PS_INACTIVE: diff --git a/src/dist/replication/lib/replica_chkpt.cpp b/src/dist/replication/lib/replica_chkpt.cpp index 3d9c5e2438..69d9156d40 100644 --- a/src/dist/replication/lib/replica_chkpt.cpp +++ b/src/dist/replication/lib/replica_chkpt.cpp @@ -37,14 +37,16 @@ #include "mutation.h" #include "mutation_log.h" #include "replica_stub.h" +#include "duplication/replica_duplicator_manager.h" #include #include #include +#include namespace dsn { namespace replication { -// run in replica thread +// ThreadPool: THREAD_POOL_REPLICATION void replica::on_checkpoint_timer() { _checker.only_one_thread_access(); @@ -67,11 +69,41 @@ void replica::on_checkpoint_timer() if (_private_log) { mutation_log_ptr plog = _private_log; - decree durable_decree = _app->last_durable_decree(); + + decree last_durable_decree = _app->last_durable_decree(); + decree min_confirmed_decree = _duplication_mgr->min_confirmed_decree(); + decree cleanable_decree = last_durable_decree; + if (min_confirmed_decree >= 0) { + if (min_confirmed_decree < last_durable_decree) { + ddebug_replica("gc_private {}: delay gc for duplication: min_confirmed_decree({}) " + "last_durable_decree({})", + enum_to_string(status()), + min_confirmed_decree, + last_durable_decree); + cleanable_decree = min_confirmed_decree; + } else { + ddebug_replica("gc_private {}: min_confirmed_decree({}) last_durable_decree({})", + enum_to_string(status()), + min_confirmed_decree, + last_durable_decree); + } + } else { + // protect the logs from being truncated + // if this app is in duplication + if (is_duplicating()) { + // unsure if the logs can be dropped, because min_confirmed_decree + // is currently unavailable + ddebug_replica( + "gc_private {}: skip gc because confirmed duplication progress is unknown", + enum_to_string(status())); + return; + } + } + int64_t valid_start_offset = _app->init_info().init_offset_in_private_log; tasking::enqueue(LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS, &_tracker, - [this, plog, durable_decree, valid_start_offset] { + [this, plog, cleanable_decree, valid_start_offset] { // run in background thread to avoid file deletion operation blocking // replication thread. if (status() == partition_status::PS_ERROR || @@ -79,7 +111,7 @@ void replica::on_checkpoint_timer() return; plog->garbage_collection( get_gpid(), - durable_decree, + cleanable_decree, valid_start_offset, (int64_t)_options->log_private_reserve_max_size_mb * 1024 * 1024, (int64_t)_options->log_private_reserve_max_time_seconds); @@ -90,7 +122,7 @@ void replica::on_checkpoint_timer() } } -// run in replica thread +// ThreadPool: THREAD_POOL_REPLICATION void replica::init_checkpoint(bool is_emergency) { // only applicable to primary and secondary replicas @@ -408,5 +440,5 @@ void replica::on_checkpoint_completed(error_code err) update_last_checkpoint_generate_time(); } } -} -} // namespace +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/replica_duplicate.cpp b/src/dist/replication/lib/replica_duplicate.cpp new file mode 100644 index 0000000000..afaec16c46 --- /dev/null +++ b/src/dist/replication/lib/replica_duplicate.cpp @@ -0,0 +1,43 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Microsoft Corporation + * + * -=- Robust Distributed System Nucleus (rDSN) -=- + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include "replica.h" + +#include + +namespace dsn { +namespace replication { + +// `duplicating` is actually a field in app_info, but on the principle +// of changing app_info as less as possible, we store it in .init_info +// instead. + +void replica::update_init_info_duplicating(bool duplicating) { /*TBD*/} + +bool replica::is_duplicating() const { /*TBD*/ return false; } + +} // namespace replication +} // namespace dsn