Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

replication: add is_idempotent attr for task code; add allow_non_idem… #122

Merged
merged 2 commits into from
Jul 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions include/dsn/dist/replication/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ DEFINE_THREAD_POOL_CODE(THREAD_POOL_LOCAL_APP)
DEFINE_THREAD_POOL_CODE(THREAD_POOL_REPLICATION_LONG)
DEFINE_THREAD_POOL_CODE(THREAD_POOL_COMPACT)

#define DEFINE_STORAGE_WRITE_RPC_CODE(x, allow_batch) \
DEFINE_STORAGE_RPC_CODE(x, TASK_PRIORITY_LOW, THREAD_POOL_REPLICATION, true, allow_batch)
#define DEFINE_STORAGE_WRITE_RPC_CODE(x, allow_batch, is_idempotent) \
DEFINE_STORAGE_RPC_CODE( \
x, TASK_PRIORITY_LOW, THREAD_POOL_REPLICATION, true, allow_batch, is_idempotent)
#define DEFINE_STORAGE_READ_RPC_CODE(x) \
DEFINE_STORAGE_RPC_CODE(x, TASK_PRIORITY_COMMON, THREAD_POOL_LOCAL_APP, false, true)
DEFINE_STORAGE_RPC_CODE(x, TASK_PRIORITY_COMMON, THREAD_POOL_LOCAL_APP, false, true, true)

#define MAKE_EVENT_CODE(x, pri) DEFINE_TASK_CODE(x, pri, CURRENT_THREAD_POOL)
#define MAKE_EVENT_CODE_AIO(x, pri) DEFINE_TASK_CODE_AIO(x, pri, CURRENT_THREAD_POOL)
Expand Down
25 changes: 19 additions & 6 deletions include/dsn/tool-api/task_code.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ class task_code
dsn_task_priority_t pri,
dsn::threadpool_code pool,
bool is_storage_write,
bool allow_batch);
bool allow_batch,
bool is_idempotent);

const char *to_string() const;

Expand Down Expand Up @@ -153,7 +154,9 @@ class task_code
// 2. for a write rpc, a primary may also need to replicate it
// to secondaries before forwarding to the storage engine.
// 3. some storage engine's rpc shouldn't be batched,
// either for better performance or correctness
// either for better performance or correctness.
// 4. some write rpc is idempotent, but some is not.
// we should differentiate it.
// so we define some specical fields in task_spec to mark these features.
//
// please refer to rpc_engine::on_recv_request for the detailes on how storage_engine's rpc
Expand All @@ -162,11 +165,21 @@ class task_code
// Notice we dispatch storage rpc's response to THREAD_POOL_DEFAULT,
// the reason is that the storage rpc's response mainly runs at client side, which is not
// necessary to start so many threadpools
#define DEFINE_STORAGE_RPC_CODE(x, pri, pool, is_write, allow_batch) \
#define DEFINE_STORAGE_RPC_CODE(x, pri, pool, is_write, allow_batch, is_idempotent) \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

把上面的注释也跟着修改一下

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

__selectany const ::dsn::task_code x( \
#x, TASK_TYPE_RPC_REQUEST, pri, pool, is_write, allow_batch); \
__selectany const ::dsn::task_code x##_ACK( \
#x "_ACK", TASK_TYPE_RPC_RESPONSE, pri, THREAD_POOL_DEFAULT, is_write, allow_batch);
#x, TASK_TYPE_RPC_REQUEST, pri, pool, is_write, allow_batch, is_idempotent); \
__selectany const ::dsn::task_code x##_ACK(#x "_ACK", \
TASK_TYPE_RPC_RESPONSE, \
pri, \
THREAD_POOL_DEFAULT, \
is_write, \
allow_batch, \
is_idempotent);

#define ALLOW_BATCH true
#define NOT_ALLOW_BATCH false
#define IS_IDEMPOTENT true
#define NOT_IDEMPOTENT false

// define a default task code "task_code_invalid", it's mainly used for representing
// some error status when you want to return task_code in some functions.
Expand Down
4 changes: 3 additions & 1 deletion include/dsn/tool-api/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ class task_spec : public extensible_object<task_spec, 4>
dsn_task_priority_t pri,
dsn::threadpool_code pool,
bool is_write_operation,
bool allow_batch);
bool allow_batch,
bool is_idempotent);

public:
// not configurable [
Expand All @@ -176,6 +177,7 @@ class task_spec : public extensible_object<task_spec, 4>
bool rpc_request_for_storage;
bool rpc_request_is_write_operation; // need stateful replication
bool rpc_request_is_write_allow_batch; // if write allow batch
bool rpc_request_is_write_idempotent; // if write operation is idempotent
// ]

// configurable [
Expand Down
4 changes: 2 additions & 2 deletions src/apps/skv/simple_kv.code.definition.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ namespace replication {
namespace application {

DEFINE_STORAGE_READ_RPC_CODE(RPC_SIMPLE_KV_SIMPLE_KV_READ)
DEFINE_STORAGE_WRITE_RPC_CODE(RPC_SIMPLE_KV_SIMPLE_KV_WRITE, true)
DEFINE_STORAGE_WRITE_RPC_CODE(RPC_SIMPLE_KV_SIMPLE_KV_APPEND, true)
DEFINE_STORAGE_WRITE_RPC_CODE(RPC_SIMPLE_KV_SIMPLE_KV_WRITE, ALLOW_BATCH, IS_IDEMPOTENT)
DEFINE_STORAGE_WRITE_RPC_CODE(RPC_SIMPLE_KV_SIMPLE_KV_APPEND, ALLOW_BATCH, NOT_IDEMPOTENT)

// test timer task code
DEFINE_TASK_CODE(LPC_SIMPLE_KV_TEST_TIMER, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)
Expand Down
7 changes: 4 additions & 3 deletions src/core/core/partition_resolver_simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ void partition_resolver_simple::on_access_failure(int partition_index, error_cod
&&
err != ERR_NOT_ENOUGH_MEMBER // primary won't change and we only r/w on primary in this
// provider
&&
err != ERR_OPERATION_DISABLED // operation disabled
) {
ddebug("clear partition configuration cache %d.%d due to access failure %s",
_app_id,
Expand Down Expand Up @@ -397,9 +399,8 @@ void partition_resolver_simple::handle_pending_requests(std::deque<request_conte
} else {
call(std::move(req), true);
}
} else if (err == ERR_HANDLER_NOT_FOUND) {
end_request(std::move(req), err, rpc_address());
} else if (err == ERR_APP_NOT_EXIST) {
} else if (err == ERR_HANDLER_NOT_FOUND || err == ERR_APP_NOT_EXIST ||
err == ERR_OPERATION_DISABLED) {
end_request(std::move(req), err, rpc_address());
} else {
call(std::move(req), true);
Expand Down
3 changes: 2 additions & 1 deletion src/core/core/rpc_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,8 @@ void rpc_engine::call_uri(rpc_address addr, message_ex *request, const rpc_respo
dsn::error_code err, dsn_message_t req, dsn_message_t resp) {
message_ex *req2 = (message_ex *)req;
if (req2->header->gpid.value() != 0 && err != ERR_OK &&
err != ERR_HANDLER_NOT_FOUND && err != ERR_APP_NOT_EXIST) {
err != ERR_HANDLER_NOT_FOUND && err != ERR_APP_NOT_EXIST &&
err != ERR_OPERATION_DISABLED) {
auto resolver = req2->server_address.uri_address()->get_resolver();
if (nullptr != resolver) {
resolver->on_access_failure(req2->header->gpid.get_partition_index(), err);
Expand Down
6 changes: 4 additions & 2 deletions src/core/core/task_code.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ task_code::task_code(const char *name,
dsn_task_priority_t pri,
dsn::threadpool_code pool,
bool is_storage_write,
bool allow_batch)
bool allow_batch,
bool is_idempotent)
: task_code(name)
{
task_spec::register_storage_task_code(*this, tt, pri, pool, is_storage_write, allow_batch);
task_spec::register_storage_task_code(
*this, tt, pri, pool, is_storage_write, allow_batch, is_idempotent);
}

const char *task_code::to_string() const
Expand Down
5 changes: 4 additions & 1 deletion src/core/core/task_spec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,15 @@ void task_spec::register_storage_task_code(task_code code,
dsn_task_priority_t pri,
threadpool_code pool,
bool is_write_operation,
bool allow_batch)
bool allow_batch,
bool is_idempotent)
{
register_task_code(code, type, pri, pool);
task_spec *spec = task_spec::get(code);
spec->rpc_request_for_storage = true;
spec->rpc_request_is_write_operation = is_write_operation;
spec->rpc_request_is_write_allow_batch = allow_batch;
spec->rpc_request_is_write_idempotent = is_idempotent;
}

task_spec *task_spec::get(int code)
Expand All @@ -126,6 +128,7 @@ task_spec::task_spec(int code,
rpc_request_for_storage(false),
rpc_request_is_write_operation(false),
rpc_request_is_write_allow_batch(false),
rpc_request_is_write_idempotent(false),
priority(pri),
pool_code(pool),
rpc_call_header_format(NET_HDR_DSN),
Expand Down
6 changes: 6 additions & 0 deletions src/dist/replication/client_lib/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ replication_options::replication_options()
verbose_commit_log_on_start = false;
delay_for_fd_timeout_on_start = false;
empty_write_disabled = false;
allow_non_idempotent_write = false;

prepare_timeout_ms_for_secondaries = 1000;
prepare_timeout_ms_for_potential_secondaries = 3000;
Expand Down Expand Up @@ -256,6 +257,11 @@ void replication_options::initialize()
"empty_write_disabled",
empty_write_disabled,
"whether to disable empty write, default is false");
allow_non_idempotent_write =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个option用tudo或者issue追踪一下,表示这是一个临时方案

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dsn_config_get_value_bool("replication",
"allow_non_idempotent_write",
allow_non_idempotent_write,
"whether to allow non-idempotent write, default is false");

prepare_timeout_ms_for_secondaries = (int)dsn_config_get_value_uint64(
"replication",
Expand Down
1 change: 1 addition & 0 deletions src/dist/replication/client_lib/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class replication_options
bool verbose_commit_log_on_start;
bool delay_for_fd_timeout_on_start;
bool empty_write_disabled;
bool allow_non_idempotent_write;

int32_t prepare_timeout_ms_for_secondaries;
int32_t prepare_timeout_ms_for_potential_secondaries;
Expand Down
6 changes: 6 additions & 0 deletions src/dist/replication/lib/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ void replica::on_client_write(task_code code, dsn_message_t request)
{
check_hashed_access();

task_spec *spec = task_spec::get(code);
if (!_options->allow_non_idempotent_write && !spec->rpc_request_is_write_idempotent) {
response_client_message(false, request, ERR_OPERATION_DISABLED);
return;
}

if (partition_status::PS_PRIMARY != status()) {
response_client_message(false, request, ERR_INVALID_STATE);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ namespace replication {
namespace test {

DEFINE_STORAGE_READ_RPC_CODE(RPC_SIMPLE_KV_SIMPLE_KV_READ)
DEFINE_STORAGE_WRITE_RPC_CODE(RPC_SIMPLE_KV_SIMPLE_KV_WRITE, true)
DEFINE_STORAGE_WRITE_RPC_CODE(RPC_SIMPLE_KV_SIMPLE_KV_APPEND, true)
DEFINE_STORAGE_WRITE_RPC_CODE(RPC_SIMPLE_KV_SIMPLE_KV_WRITE, ALLOW_BATCH, IS_IDEMPOTENT)
DEFINE_STORAGE_WRITE_RPC_CODE(RPC_SIMPLE_KV_SIMPLE_KV_APPEND, ALLOW_BATCH, NOT_IDEMPOTENT)

// test timer task code
DEFINE_TASK_CODE(LPC_SIMPLE_KV_TEST_TIMER, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)
Expand Down