Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: add fault injection in write path #117

Merged
merged 13 commits into from
Jul 13, 2018
2 changes: 1 addition & 1 deletion rdsn
3 changes: 3 additions & 0 deletions src/server/pegasus_server_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ class pegasus_server_write : public dsn::replication::replica_base
/// Error returned is regarded as the failure of replica, thus will trigger
/// cluster membership changes. Make sure no error is returned because of
/// invalid user argument.
/// As long as the returned error is 0, the operation is guaranteed to be
/// successfully applied into rocksdb, which means an empty_put will be called
/// even if there's no write.
int on_batched_write_requests(dsn_message_t *requests,
int count,
int64_t decree,
Expand Down
1 change: 1 addition & 0 deletions src/server/pegasus_write_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class pegasus_write_service

private:
friend class pegasus_write_service_test;
friend class pegasus_server_write_test;

class impl;
std::unique_ptr<impl> _impl;
Expand Down
15 changes: 15 additions & 0 deletions src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@

#include "base/pegasus_key_schema.h"

#include <dsn/utility/fail_point.h>
#include <dsn/utility/string_conv.h>

namespace pegasus {
namespace server {

/// internal error codes used for fail injection
static constexpr int FAIL_DB_WRITE_BATCH_PUT = -101;
static constexpr int FAIL_DB_WRITE_BATCH_DELETE = -102;
static constexpr int FAIL_DB_WRITE = -103;

class pegasus_write_service::impl : public dsn::replication::replica_base
{
public:
Expand Down Expand Up @@ -229,6 +235,9 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
dsn::string_view value,
uint32_t expire_sec)
{
FAIL_POINT_INJECT_F("db_write_batch_put",
[](dsn::string_view) -> int { return FAIL_DB_WRITE_BATCH_PUT; });

rocksdb::Slice skey = utils::to_rocksdb_slice(raw_key);
rocksdb::SliceParts skey_parts(&skey, 1);
rocksdb::SliceParts svalue =
Expand All @@ -250,6 +259,9 @@ class pegasus_write_service::impl : public dsn::replication::replica_base

int db_write_batch_delete(int64_t decree, dsn::string_view raw_key)
{
FAIL_POINT_INJECT_F("db_write_batch_delete",
[](dsn::string_view) -> int { return FAIL_DB_WRITE_BATCH_DELETE; });

rocksdb::Status s = _batch.Delete(utils::to_rocksdb_slice(raw_key));
if (dsn_unlikely(!s.ok())) {
::dsn::blob hash_key, sort_key;
Expand All @@ -269,6 +281,8 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
{
dassert(_batch.Count() != 0, "");

FAIL_POINT_INJECT_F("db_write", [](dsn::string_view) -> int { return FAIL_DB_WRITE; });

_wt_opts->given_decree = static_cast<uint64_t>(decree);
auto status = _db->Write(*_wt_opts, &_batch);
if (!status.ok()) {
Expand Down Expand Up @@ -304,6 +318,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base

private:
friend class pegasus_write_service_test;
friend class pegasus_server_write_test;

const std::string _primary_address;
const uint32_t _value_schema_version;
Expand Down
2 changes: 2 additions & 0 deletions src/server/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ set(MY_PROJ_LIBS
rocksdb
pegasus.base
gtest
boost_regex
z bz2 snappy rt aio pthread
)
add_definitions(-DPEGASUS_UNIT_TEST)
add_definitions(-DENABLE_FAIL)

set(MY_BOOST_PACKAGES system filesystem)

Expand Down
3 changes: 1 addition & 2 deletions src/server/test/message_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ inline dsn_message_t create_put_request(const dsn::apps::update_request &request

inline dsn_message_t create_remove_request(const dsn::blob &key)
{
return dsn_msg_create_received_request(
dsn::apps::RPC_RRDB_RRDB_REMOVE, DSF_THRIFT_BINARY, (void *)key.data(), key.length());
return dsn::from_thrift_request_to_received_message(key, dsn::apps::RPC_RRDB_RRDB_REMOVE);
}

inline dsn_message_t create_incr_request(const dsn::apps::incr_request &request)
Expand Down
109 changes: 109 additions & 0 deletions src/server/test/pegasus_server_write_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include "pegasus_server_test_base.h"
#include "message_utils.h"
#include "server/pegasus_server_write.h"
#include "server/pegasus_write_service_impl.h"
#include "base/pegasus_key_schema.h"

#include <dsn/utility/fail_point.h>
#include <dsn/utility/defer.h>

namespace pegasus {
namespace server {

class pegasus_server_write_test : public pegasus_server_test_base
{
std::unique_ptr<pegasus_server_write> _server_write;

public:
pegasus_server_write_test() : pegasus_server_test_base()
{
_server_write = dsn::make_unique<pegasus_server_write>(_server.get(), true);
}

void test_batch_writes()
{
dsn::fail::setup();

dsn::fail::cfg("db_write_batch_put", "10%return()");
dsn::fail::cfg("db_write_batch_remove", "10%return()");
dsn::fail::cfg("db_write", "10%return()");

for (int decree = 1; decree <= 1000; decree++) {
RPC_MOCKING(put_rpc) RPC_MOCKING(remove_rpc)
{
dsn::blob key;
pegasus_generate_key(key, std::string("hash"), std::string("sort"));
dsn::apps::update_request req;
req.key = key;
req.value.assign("value", 0, 5);

int put_rpc_cnt = dsn_random32(1, 10);
int remove_rpc_cnt = dsn_random32(1, 10);
int total_rpc_cnt = put_rpc_cnt + remove_rpc_cnt;
auto writes = new dsn_message_t[total_rpc_cnt];
for (int i = 0; i < put_rpc_cnt; i++) {
writes[i] = pegasus::create_put_request(req);
}
for (int i = put_rpc_cnt; i < total_rpc_cnt; i++) {
writes[i] = pegasus::create_remove_request(key);
}
auto cleanup = dsn::defer([=]() {
for (int i = 0; i < total_rpc_cnt; i++) {
dsn_msg_release_ref(writes[i]);
}
delete writes;
});

int err =
_server_write->on_batched_write_requests(writes, total_rpc_cnt, decree, 0);
switch (err) {
case FAIL_DB_WRITE_BATCH_PUT:
case FAIL_DB_WRITE_BATCH_DELETE:
case FAIL_DB_WRITE:
case 0:
break;
default:
ASSERT_TRUE(false) << "unacceptable error: " << err;
}

// make sure everything is cleanup after batch write.
ASSERT_TRUE(_server_write->_put_rpc_batch.empty());
ASSERT_TRUE(_server_write->_remove_rpc_batch.empty());
ASSERT_TRUE(_server_write->_write_svc->_batch_qps_perfcounters.empty());
ASSERT_TRUE(_server_write->_write_svc->_batch_latency_perfcounters.empty());
ASSERT_EQ(_server_write->_write_svc->_batch_start_time, 0);
ASSERT_EQ(_server_write->_write_svc->_impl->_batch.Count(), 0);
ASSERT_EQ(_server_write->_write_svc->_impl->_update_responses.size(), 0);

ASSERT_EQ(put_rpc::mail_box().size(), put_rpc_cnt);
ASSERT_EQ(remove_rpc::mail_box().size(), remove_rpc_cnt);
for (auto &rpc : put_rpc::mail_box()) {
verify_response(rpc.response(), err, decree);
}
for (auto &rpc : remove_rpc::mail_box()) {
verify_response(rpc.response(), err, decree);
}
}
}

dsn::fail::teardown();
}

void verify_response(const dsn::apps::update_response &response, int err, int64_t decree)
{
ASSERT_EQ(response.error, err);
ASSERT_EQ(response.app_id, _gpid.get_app_id());
ASSERT_EQ(response.partition_index, _gpid.get_partition_index());
ASSERT_EQ(response.decree, decree);
ASSERT_EQ(response.server, _server_write->_write_svc->_impl->_primary_address);
}
};

TEST_F(pegasus_server_write_test, batch_writes) { test_batch_writes(); }

} // namespace server
} // namespace pegasus
73 changes: 57 additions & 16 deletions src/server/test/pegasus_write_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class pegasus_write_service_test : public pegasus_server_test_base

void test_multi_put()
{
dsn::fail::setup();

dsn::apps::multi_put_request request;
dsn::apps::update_response response;

Expand All @@ -34,8 +36,8 @@ class pegasus_write_service_test : public pegasus_server_test_base
// alarm for empty request
request.hash_key = dsn::blob(hash_key.data(), 0, hash_key.size());
int err = _write_svc->multi_put(decree, request, response);
ASSERT_EQ(response.error, rocksdb::Status::kInvalidArgument);
ASSERT_EQ(err, 0);
verify_response(response, rocksdb::Status::kInvalidArgument, decree);

constexpr int kv_num = 100;
std::string sort_key[kv_num];
Expand All @@ -52,11 +54,27 @@ class pegasus_write_service_test : public pegasus_server_test_base
request.kvs.back().value.assign(value[i].data(), 0, value[i].size());
}

_write_svc->multi_put(decree, request, response);
ASSERT_EQ(response.error, 0);
ASSERT_EQ(response.app_id, _gpid.get_app_id());
ASSERT_EQ(response.partition_index, _gpid.get_partition_index());
ASSERT_EQ(response.decree, decree);
{
dsn::fail::cfg("db_write_batch_put", "100%1*return()");
err = _write_svc->multi_put(decree, request, response);
ASSERT_EQ(err, rocksdb::Status::kCorruption);
verify_response(response, err, decree);
}

{
dsn::fail::cfg("db_write", "100%1*return()");
err = _write_svc->multi_put(decree, request, response);
ASSERT_EQ(err, rocksdb::Status::kCorruption);
verify_response(response, err, decree);
}

{ // success
err = _write_svc->multi_put(decree, request, response);
ASSERT_EQ(err, 0);
verify_response(response, 0, decree);
}

dsn::fail::teardown();
}

void test_multi_remove()
Expand All @@ -70,8 +88,8 @@ class pegasus_write_service_test : public pegasus_server_test_base
// alarm for empty request
request.hash_key = dsn::blob(hash_key.data(), 0, hash_key.size());
int err = _write_svc->multi_remove(decree, request, response);
ASSERT_EQ(response.error, rocksdb::Status::kInvalidArgument);
ASSERT_EQ(err, 0);
verify_response(response, rocksdb::Status::kInvalidArgument, decree);

constexpr int kv_num = 100;
std::string sort_key[kv_num];
Expand All @@ -85,11 +103,25 @@ class pegasus_write_service_test : public pegasus_server_test_base
request.sort_keys.back().assign(sort_key[i].data(), 0, sort_key[i].size());
}

_write_svc->multi_remove(decree, request, response);
ASSERT_EQ(response.error, 0);
ASSERT_EQ(response.app_id, _gpid.get_app_id());
ASSERT_EQ(response.partition_index, _gpid.get_partition_index());
ASSERT_EQ(response.decree, decree);
{
dsn::fail::cfg("db_write_batch_delete", "100%1*return()");
err = _write_svc->multi_remove(decree, request, response);
ASSERT_EQ(err, rocksdb::Status::kCorruption);
verify_response(response, err, decree);
}

{
dsn::fail::cfg("db_write", "100%1*return()");
err = _write_svc->multi_remove(decree, request, response);
ASSERT_EQ(err, rocksdb::Status::kCorruption);
verify_response(response, err, decree);
}

{ // success
err = _write_svc->multi_remove(decree, request, response);
ASSERT_EQ(err, 0);
verify_response(response, 0, decree);
}
}

void test_batched_writes()
Expand Down Expand Up @@ -125,12 +157,21 @@ class pegasus_write_service_test : public pegasus_server_test_base
}

for (const dsn::apps::update_response &resp : responses) {
ASSERT_EQ(resp.error, 0);
ASSERT_EQ(resp.app_id, _gpid.get_app_id());
ASSERT_EQ(resp.partition_index, _gpid.get_partition_index());
ASSERT_EQ(resp.decree, decree);
verify_response(resp, 0, decree);
}
}

template <typename TResponse>
void verify_response(const TResponse &response, int err, int64_t decree)
{
ASSERT_EQ(response.error, err);
ASSERT_EQ(response.app_id, _gpid.get_app_id());
ASSERT_EQ(response.partition_index, _gpid.get_partition_index());
ASSERT_EQ(response.decree, decree);
ASSERT_EQ(response.server, _write_svc->_impl->_primary_address);
ASSERT_EQ(_write_svc->_impl->_batch.Count(), 0);
ASSERT_EQ(_write_svc->_impl->_update_responses.size(), 0);
}
};

TEST_F(pegasus_write_service_test, multi_put) { test_multi_put(); }
Expand Down