Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ddl_client): sleep for a while before retry once meta server is busy #1453

Merged
merged 23 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7762625
feat(new_metrics): retry after waiting for an fixed interval while re…
empiredan Apr 13, 2023
4f0affe
feat(new_metrics): retry after waiting for an fixed interval while re…
empiredan Apr 14, 2023
56952a8
fix(ddl_client): after receiving ERR_BUSY_CREATING or ERR_BUSY_DROPPI…
empiredan Apr 17, 2023
89a08f1
fix(ddl_client): after receiving ERR_BUSY_CREATING or ERR_BUSY_DROPPI…
empiredan Apr 17, 2023
6cdd5bd
fix(ddl_client): after receiving ERR_BUSY_CREATING or ERR_BUSY_DROPPI…
empiredan Apr 17, 2023
4cff3ac
fix(ddl_client): after receiving ERR_BUSY_CREATING or ERR_BUSY_DROPPI…
empiredan Apr 18, 2023
cea029d
fix(ddl_client): after receiving ERR_BUSY_CREATING or ERR_BUSY_DROPPI…
empiredan Apr 18, 2023
7916325
fix(ddl_client): after receiving ERR_BUSY_CREATING or ERR_BUSY_DROPPI…
empiredan Apr 18, 2023
520bed2
fix(ddl_client): after receiving ERR_BUSY_CREATING or ERR_BUSY_DROPPI…
empiredan Apr 18, 2023
e79ef2f
fix(ddl_client): after receiving ERR_BUSY_CREATING or ERR_BUSY_DROPPI…
empiredan Apr 18, 2023
b0cb7db
fix(ddl_client): after receiving ERR_BUSY_CREATING or ERR_BUSY_DROPPI…
empiredan Apr 18, 2023
279bcbb
fix(ddl_client): after receiving ERR_BUSY_CREATING or ERR_BUSY_DROPPI…
empiredan Apr 18, 2023
958b5eb
fix(ddl_client): after receiving ERR_BUSY_CREATING or ERR_BUSY_DROPPI…
empiredan Apr 18, 2023
75a6dca
fix(ddl_client): after receiving ERR_BUSY_CREATING or ERR_BUSY_DROPPI…
empiredan Apr 19, 2023
5e1b2e4
fix(ddl_client): after receiving ERR_BUSY_CREATING or ERR_BUSY_DROPPI…
empiredan Apr 19, 2023
44f50a2
fix(ddl_client): after receiving ERR_BUSY_CREATING or ERR_BUSY_DROPPI…
empiredan Apr 19, 2023
e6080aa
fix(ddl_client): after receiving ERR_BUSY_CREATING or ERR_BUSY_DROPPI…
empiredan Apr 20, 2023
c3dd72c
fix(ddl_client): after receiving ERR_BUSY_CREATING or ERR_BUSY_DROPPI…
empiredan Apr 20, 2023
7f151dd
fix(ddl_client): after receiving ERR_BUSY_CREATING or ERR_BUSY_DROPPI…
empiredan Apr 21, 2023
d6be548
fix(ddl_client): after receiving ERR_BUSY_CREATING or ERR_BUSY_DROPPI…
empiredan Apr 21, 2023
4d7d5f7
fix(ddl_client): sleep for a while before retry once meta server is busy
empiredan Apr 22, 2023
b39c7c7
fix(ddl_client): sleep for a while before retry once meta server is busy
empiredan Apr 23, 2023
7d26c9d
fix(ddl_client): sleep for a while before retry once meta server is busy
empiredan Apr 23, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/lint_and_test_cpp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ jobs:
- detect_hotspot_test
- dsn_aio_test
- dsn_block_service_test
- dsn_client_test
- dsn.failure_detector.tests
- dsn_http_test
- dsn_meta_state_tests
Expand Down Expand Up @@ -297,6 +298,7 @@ jobs:
- detect_hotspot_test
- dsn_aio_test
- dsn_block_service_test
- dsn_client_test
- dsn.failure_detector.tests
- dsn_http_test
- dsn_meta_state_tests
Expand Down Expand Up @@ -424,6 +426,7 @@ jobs:
# - detect_hotspot_test
# - dsn_aio_test
# - dsn_block_service_test
# - dsn_client_test
# - dsn.failure_detector.tests
# - dsn_http_test
# - dsn_meta_state_tests
Expand Down
1 change: 1 addition & 0 deletions run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ function run_test()
detect_hotspot_test
dsn_aio_test
dsn_block_service_test
dsn_client_test
dsn.failure_detector.tests
dsn_http_test
dsn_meta_state_tests
Expand Down
2 changes: 2 additions & 0 deletions src/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ set(MY_PROJ_LIBS "")
set(MY_BINPLACES "")

dsn_add_static_library()

add_subdirectory(test)
207 changes: 111 additions & 96 deletions src/client/replication_ddl_client.cpp

Large diffs are not rendered by default.

110 changes: 102 additions & 8 deletions src/client/replication_ddl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@

#pragma once

#include <gtest/gtest_prod.h>
#include <stdint.h>
#include <chrono>
#include <deque>
#include <map>
#include <memory>
#include <string>
#include <thread>
#include <utility>
#include <vector>

Expand All @@ -50,6 +54,14 @@
#include "utils/autoref_ptr.h"
#include "utils/error_code.h"
#include "utils/errors.h"
#include "utils/fail_point.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/ports.h"
#include "utils/string_view.h"

DSN_DECLARE_uint32(ddl_client_max_attempt_count);
DSN_DECLARE_uint32(ddl_client_retry_interval_ms);

namespace dsn {
class gpid;
Expand Down Expand Up @@ -257,32 +269,99 @@ class replication_ddl_client
bool static valid_app_char(int c);

void end_meta_request(const rpc_response_task_ptr &callback,
int retry_times,
error_code err,
uint32_t attempt_count,
const error_code &err,
dsn::message_ex *request,
dsn::message_ex *resp);

template <typename TRequest>
rpc_response_task_ptr request_meta(dsn::task_code code,
rpc_response_task_ptr request_meta(const dsn::task_code &code,
std::shared_ptr<TRequest> &req,
int timeout_milliseconds = 0,
int reply_thread_hash = 0)
{
dsn::message_ex *msg = dsn::message_ex::create_request(code, timeout_milliseconds);
::dsn::marshall(msg, *req);
auto msg = dsn::message_ex::create_request(code, timeout_milliseconds);
dsn::marshall(msg, *req);

rpc_response_task_ptr task = ::dsn::rpc::create_rpc_response_task(
msg, nullptr, empty_rpc_handler, reply_thread_hash);
auto task =
dsn::rpc::create_rpc_response_task(msg, nullptr, empty_rpc_handler, reply_thread_hash);
rpc::call(_meta_server,
msg,
&_tracker,
[this, task](
error_code err, dsn::message_ex *request, dsn::message_ex *response) mutable {
end_meta_request(std::move(task), 0, err, request, response);

FAIL_POINT_INJECT_NOT_RETURN_F(
"ddl_client_request_meta",
[&err, this](dsn::string_view str) { err = pop_mock_error(); });

end_meta_request(std::move(task), 1, err, request, response);
});
return task;
}

static inline bool is_busy(const dsn::error_code &err)
{
return err == dsn::ERR_BUSY_CREATING || err == dsn::ERR_BUSY_DROPPING;
}

template <typename TRequest, typename TResponse>
rpc_response_task_ptr request_meta_and_wait_response(const dsn::task_code &code,
std::shared_ptr<TRequest> &req,
TResponse &resp,
int timeout_milliseconds = 0,
int reply_thread_hash = 0)
{
rpc_response_task_ptr resp_task;
for (uint32_t i = 1; i <= FLAGS_ddl_client_max_attempt_count; ++i) {
resp_task = request_meta(code, req, timeout_milliseconds, reply_thread_hash);
resp_task->wait();

// Failed to send request to meta server. The possible reason might be:
// * cannot connect to meta server (such as ERR_NETWORK_FAILURE);
// * do not receive any response from meta server (such as ERR_TIMEOUT)
if (resp_task->error() != dsn::ERR_OK) {
return resp_task;
}

// Once response is nullptr, it must be mocked by unit tests since network is
// not connected.
if (dsn_likely(resp_task->get_response() != nullptr)) {
// Received the response from meta server successfully, thus deserialize the
// response.
dsn::unmarshall(resp_task->get_response(), resp);
}

FAIL_POINT_INJECT_NOT_RETURN_F(
"ddl_client_request_meta",
[&resp, this](dsn::string_view str) { resp.err = pop_mock_error(); });

LOG_INFO("received response from meta server: rpc_code={}, err={}, attempt_count={}, "
"max_attempt_count={}",
code,
resp.err,
i,
FLAGS_ddl_client_max_attempt_count);

// Once `err` field in the received response is ERR_OK or some non-busy error, do not
// attempt again.
if (resp.err == dsn::ERR_OK || !is_busy(resp.err)) {
return resp_task;
}

// Would not sleep for the last attempt.
if (i < FLAGS_ddl_client_max_attempt_count) {
LOG_WARNING("sleep {} milliseconds before launch another attempt for {}: err={}",
FLAGS_ddl_client_retry_interval_ms,
code,
resp.err);
std::this_thread::sleep_for(
std::chrono::milliseconds(FLAGS_ddl_client_retry_interval_ms));
}
}
return resp_task;
}

/// Send request to meta server synchronously.
template <typename TRpcHolder, typename TResponse = typename TRpcHolder::response_type>
error_with<TResponse> call_rpc_sync(TRpcHolder rpc, int reply_thread_hash = 0)
Expand Down Expand Up @@ -345,6 +424,21 @@ class replication_ddl_client
dsn::task_tracker _tracker;
uint32_t _max_wait_secs = 3600; // Wait at most 1 hour by default.

// Used only for unit tests.
FRIEND_TEST(DDLClientTest, RetryMetaRequest);
void set_mock_errors(const std::vector<dsn::error_code> &mock_errors)
{
_mock_errors.assign(mock_errors.begin(), mock_errors.end());
}
dsn::error_code pop_mock_error()
{
CHECK_FALSE(_mock_errors.empty());
auto err = _mock_errors.front();
_mock_errors.pop_front();
return err;
}
std::deque<dsn::error_code> _mock_errors;

typedef rpc_holder<detect_hotkey_request, detect_hotkey_response> detect_hotkey_rpc;
typedef rpc_holder<query_disk_info_request, query_disk_info_response> query_disk_info_rpc;
typedef rpc_holder<add_new_disk_request, add_new_disk_response> add_new_disk_rpc;
Expand Down
39 changes: 39 additions & 0 deletions src/client/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

set(MY_PROJ_NAME dsn_client_test)
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved

set(MY_PROJ_SRC "")

set(MY_SRC_SEARCH_MODE "GLOB")

set(MY_PROJ_LIBS
dsn_client
dsn_replication_common
dsn_runtime
dsn_utils
gtest
)

set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex)

set(MY_BINPLACES
"${CMAKE_CURRENT_SOURCE_DIR}/run.sh"
"${CMAKE_CURRENT_SOURCE_DIR}/config.ini"
)

dsn_add_test()
83 changes: 83 additions & 0 deletions src/client/test/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
; Licensed to the Apache Software Foundation (ASF) under one
; or more contributor license agreements. See the NOTICE file
; distributed with this work for additional information
; regarding copyright ownership. The ASF licenses this file
; to you under the Apache License, Version 2.0 (the
; "License"); you may not use this file except in compliance
; with the License. You may obtain a copy of the License at
;
; http://www.apache.org/licenses/LICENSE-2.0
;
; Unless required by applicable law or agreed to in writing,
; software distributed under the License is distributed on an
; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
; KIND, either express or implied. See the License for the
; specific language governing permissions and limitations
; under the License.
[apps..default]
run = true
count = 1
;network.client.RPC_CHANNEL_TCP = dsn::tools::sim_network_provider, 65536
;network.client.RPC_CHANNEL_UDP = dsn::tools::sim_network_provider, 65536
;network.server.0.RPC_CHANNEL_TCP = dsn::tools::sim_network_provider, 65536

[apps.mimic]
type = dsn.app.mimic
arguments =
pools = THREAD_POOL_DEFAULT,THREAD_POOL_META_SERVER
run = true
count = 1

[core]
;tool = simulator
tool = nativerun
;toollets = tracer
;toollets = tracer, profiler, fault_injector
pause_on_start = false

logging_start_level = LOG_LEVEL_INFO
logging_factory_name = dsn::tools::simple_logger
;logging_factory_name = dsn::tools::screen_logger
logging_flush_on_exit = false

enable_default_app_mimic = true

data_dir = ./pegasus_shell.data

[tools.simple_logger]
short_header = false
fast_flush = true
max_number_of_log_files_on_disk = 10
stderr_start_level = LOG_LEVEL_FATAL

[tools.simulator]
random_seed = 0

[network]
; how many network threads for network library(used by asio)
io_service_worker_count = 4

; specification for each thread pool
[threadpool..default]
worker_count = 4
partitioned = false
worker_priority = THREAD_xPRIORITY_NORMAL

[threadpool.THREAD_POOL_DEFAULT]
name = default
worker_count = 20

[threadpool.THREAD_POOL_META_SERVER]
name = meta_server

[task..default]
is_trace = false
is_profile = false
allow_inline = false
rpc_call_header_format = NET_HDR_DSN
rpc_call_channel = RPC_CHANNEL_TCP
rpc_timeout_milliseconds = 10000

[pegasus.clusters]
@CLUSTER_NAME@ = @CLUSTER_ADDRESS@

Loading