Skip to content

Commit

Permalink
fix: Fix the corruption RocksDB instance will be reused bug (#1422)
Browse files Browse the repository at this point in the history
#1383

This patch deal with the error `kCorruption` returned from storage
engine of write requests. After replica server got such an error,
it will trash the replica to a trash path
`<app_id>.<pid>.pegasus.<timestamp>.err`.

Note that the replica server may crash because the corrupted replica
has been trashed and closed, it is left to be completed by another
patches.
  • Loading branch information
acelyc111 authored Apr 11, 2023
1 parent ababcc0 commit 9303c3a
Show file tree
Hide file tree
Showing 14 changed files with 238 additions and 15 deletions.
2 changes: 2 additions & 0 deletions src/common/fs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <stdint.h>
#include <functional>
#include <gtest/gtest_prod.h>
#include <map>
#include <memory>
#include <set>
Expand Down Expand Up @@ -147,6 +148,7 @@ class fs_manager
friend class replica_disk_migrator;
friend class replica_disk_test_base;
friend class open_replica_test;
FRIEND_TEST(replica_test, test_auto_trash);
};
} // replication
} // dsn
7 changes: 7 additions & 0 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
// which is binded to this replication partition
//

#include <gtest/gtest_prod.h>
#include <stddef.h>
#include <stdint.h>
#include <atomic>
Expand Down Expand Up @@ -78,6 +79,7 @@ namespace dsn {
class gpid;
class perf_counter;
class rpc_address;

namespace dist {
namespace block_service {
class block_filesystem;
Expand Down Expand Up @@ -520,6 +522,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
void update_app_max_replica_count(int32_t max_replica_count);
void update_app_name(const std::string &app_name);

bool is_data_corrupted() const { return _data_corrupted; }

private:
friend class ::dsn::replication::test::test_checker;
friend class ::dsn::replication::mutation_queue;
Expand All @@ -540,6 +544,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
friend class replica_disk_migrate_test;
friend class open_replica_test;
friend class replica_follower;
FRIEND_TEST(replica_test, test_auto_trash);

// replica configuration, updated by update_local_configuration ONLY
replica_configuration _config;
Expand Down Expand Up @@ -661,6 +666,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
disk_status::type _disk_status{disk_status::NORMAL};

bool _allow_ingest_behind{false};
// Indicate where the storage engine data is corrupted and unrecoverable.
bool _data_corrupted{false};
};
typedef dsn::ref_ptr<replica> replica_ptr;
} // namespace replication
Expand Down
4 changes: 4 additions & 0 deletions src/replica/replica_failover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ void replica::handle_local_failure(error_code error)
{
LOG_INFO_PREFIX("handle local failure error {}, status = {}", error, enum_to_string(status()));

if (error == ERR_RDB_CORRUPTION) {
_data_corrupted = true;
}

if (status() == partition_status::PS_PRIMARY) {
_stub->remove_replica_on_meta_server(_app_info, _primary_states.membership);
}
Expand Down
10 changes: 9 additions & 1 deletion src/replica/replica_learn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1236,6 +1236,10 @@ void replica::handle_learning_error(error_code err, bool is_local_error)
err,
is_local_error ? "local_error" : "remote error");

if (is_local_error && err == ERR_RDB_CORRUPTION) {
_data_corrupted = true;
}

_stub->_counter_replicas_learning_recent_learn_fail_count->increment();

update_local_configuration_with_no_ballot_change(
Expand Down Expand Up @@ -1495,7 +1499,11 @@ error_code replica::apply_learned_state_from_private_log(learn_state &state)
}

// TODO: assign the returned error_code to err and check it
_app->apply_mutation(mu);
auto ec = _app->apply_mutation(mu);
if (ec != ERR_OK) {
handle_local_failure(ec);
return;
}

// appends logs-in-cache into plog to ensure them can be duplicated.
// if current case is step back, it means the logs has been reserved
Expand Down
6 changes: 6 additions & 0 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2383,6 +2383,12 @@ void replica_stub::close_replica(replica_ptr r)
_counter_replicas_closing_count->decrement();
}

if (r->is_data_corrupted()) {
_fs_manager.remove_replica(id);
move_to_err_path(r->dir(), "trash replica");
_counter_replicas_recent_replica_move_error_count->increment();
}

LOG_INFO("{}: finish to close replica", name);
}

Expand Down
1 change: 1 addition & 0 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
friend class replica_follower_test;
friend class replica_http_service_test;
FRIEND_TEST(replica_test, test_clear_on_failure);
FRIEND_TEST(replica_test, test_auto_trash);

typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
typedef std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, app_info, replica_info>>
Expand Down
9 changes: 8 additions & 1 deletion src/replica/replication_app_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,14 @@ error_code replication_app_base::apply_mutation(const mutation *mu)
// because the external sst files may not exist, in this case, we won't consider it as
// an error.
if (!has_ingestion_request) {
return ERR_LOCAL_APP_FAILURE;
switch (storage_error) {
// TODO(yingchun): Now only kCorruption is dealt, consider to deal with more storage
// engine errors.
case rocksdb::Status::kCorruption:
return ERR_RDB_CORRUPTION;
default:
return ERR_LOCAL_APP_FAILURE;
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/replica/replication_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,10 @@ class replication_app_base : public replica_base

// Return code:
// - ERR_OK: everything is OK.
// - ERR_RDB_CORRUPTION: encountered some unrecoverable data errors, i.e. kCorruption from
// storage engine.
// - ERR_LOCAL_APP_FAILURE: other type of errors.
error_code apply_mutation(const mutation *mu);
error_code apply_mutation(const mutation *mu) WARN_UNUSED_RESULT;

// methods need to implement on storage engine side
virtual error_code start(int argc, char **argv) = 0;
Expand Down
26 changes: 15 additions & 11 deletions src/replica/split/replica_split_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,17 +427,21 @@ replica_split_manager::child_apply_private_logs(std::vector<std::string> plog_fi
error_code ec;
int64_t offset;
// temp prepare_list used for apply states
prepare_list plist(_replica,
_replica->_app->last_committed_decree(),
FLAGS_max_mutation_count_in_prepare_list,
[this](mutation_ptr &mu) {
if (mu->data.header.decree !=
_replica->_app->last_committed_decree() + 1) {
return;
}

_replica->_app->apply_mutation(mu);
});
prepare_list plist(
_replica,
_replica->_app->last_committed_decree(),
FLAGS_max_mutation_count_in_prepare_list,
[this](mutation_ptr &mu) {
if (mu->data.header.decree != _replica->_app->last_committed_decree() + 1) {
return;
}

auto e = _replica->_app->apply_mutation(mu);
if (e != ERR_OK) {
LOG_ERROR_PREFIX("got an error({}) in commit stage of prepare_list", e);
return;
}
});

// replay private log
ec = mutation_log::replay(plog_files,
Expand Down
2 changes: 1 addition & 1 deletion src/replica/test/clear.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@
# specific language governing permissions and limitations
# under the License.

rm -rf core.* data/ log.* replica.* tag* test* test_cluster/
rm -rf *.err core.* data/ log.* replica.* tag* test* test_cluster/
39 changes: 39 additions & 0 deletions src/replica/test/replica_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "metadata_types.h"
#include "perf_counter/perf_counter.h"
#include "perf_counter/perf_counter_wrapper.h"
#include "replica/disk_cleaner.h"
#include "replica/replica.h"
#include "replica/replica_http_service.h"
#include "replica/replica_stub.h"
Expand All @@ -59,6 +60,7 @@
#include "utils/filesystem.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/string_conv.h"

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -480,6 +482,43 @@ TEST_F(replica_test, test_clear_on_failure)
ASSERT_FALSE(has_gpid(pid));
}

TEST_F(replica_test, test_auto_trash)
{
// Disable failure detector to avoid connecting with meta server which is not started.
FLAGS_fd_disabled = true;

replica *rep =
stub->generate_replica(_app_info, pid, partition_status::PS_PRIMARY, 1, false, true);
auto path = rep->dir();
dsn::utils::filesystem::create_directory(path);
ASSERT_TRUE(has_gpid(pid));

rep->handle_local_failure(ERR_RDB_CORRUPTION);
stub->wait_closing_replicas_finished();

ASSERT_FALSE(dsn::utils::filesystem::path_exists(path));
dir_node *dn = stub->get_fs_manager()->get_dir_node(path);
ASSERT_NE(dn, nullptr);
std::vector<std::string> subs;
ASSERT_TRUE(dsn::utils::filesystem::get_subdirectories(dn->full_dir, subs, false));
bool found = false;
const int ts_length = 16;
size_t err_pos = path.size() + ts_length + 1; // Add 1 for dot in path.
for (const auto &sub : subs) {
if (sub.size() <= path.size()) {
continue;
}
uint64_t ts = 0;
if (sub.find(path) == 0 && sub.find(kFolderSuffixErr) == err_pos &&
dsn::buf2uint64(sub.substr(path.size() + 1, ts_length), ts)) {
found = true;
break;
}
}
ASSERT_TRUE(found);
ASSERT_FALSE(has_gpid(pid));
}

TEST_F(replica_test, update_deny_client_test)
{
struct update_deny_client_test
Expand Down
11 changes: 11 additions & 0 deletions src/server/rocksdb_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,19 @@
#include "server/pegasus_write_service.h"
#include "utils/blob.h"
#include "utils/fail_point.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/ports.h"

namespace pegasus {
namespace server {

DSN_DEFINE_int32(pegasus.server,
inject_write_error_for_test,
0,
"Which error code to inject in write path, 0 means no error. Only for test.");
DSN_TAG_VARIABLE(inject_write_error_for_test, FT_MUTABLE);

rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
: replica_base(server),
_db(server->_db),
Expand Down Expand Up @@ -154,6 +161,10 @@ int rocksdb_wrapper::write(int64_t decree)
{
CHECK_GT(_write_batch->Count(), 0);

if (dsn_unlikely(FLAGS_inject_write_error_for_test != rocksdb::Status::kOk)) {
return FLAGS_inject_write_error_for_test;
}

FAIL_POINT_INJECT_F("db_write", [](dsn::string_view) -> int { return FAIL_DB_WRITE; });

rocksdb::Status status =
Expand Down
Loading

0 comments on commit 9303c3a

Please sign in to comment.