Skip to content

Commit

Permalink
fix(graceful_exit): adjust the destructing order (XiaoMi#843)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyifan27 committed Jul 1, 2021
1 parent 3fce156 commit 215e99f
Show file tree
Hide file tree
Showing 15 changed files with 120 additions and 38 deletions.
1 change: 1 addition & 0 deletions include/dsn/tool-api/timer_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class timer_service
virtual ~timer_service() = default;

virtual void start() = 0;
virtual void stop() = 0;

// after milliseconds, the provider should call task->enqueue()
virtual void add_timer(task *task) = 0;
Expand Down
6 changes: 5 additions & 1 deletion src/aio/test/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,9 @@ GTEST_API_ int main(int argc, char **argv)
{
testing::InitGoogleTest(&argc, argv);
dsn_run_config("config.ini", false);
return RUN_ALL_TESTS();
int g_test_ret = RUN_ALL_TESTS();
#ifndef ENABLE_GCOV
dsn_exit(g_test_ret);
#endif
return g_test_ret;
}
9 changes: 8 additions & 1 deletion src/meta/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,17 @@ meta_service::meta_service()
_access_controller = security::create_meta_access_controller();
}

meta_service::~meta_service()
meta_service::~meta_service() { stop(); }

void meta_service::stop()
{
zauto_write_lock l(_meta_lock);
if (!_started.load()) {
return;
}
_tracker.cancel_outstanding_tasks();
unregister_ctrl_commands();
_started = false;
}

bool meta_service::check_freeze() const
Expand Down
1 change: 1 addition & 0 deletions src/meta/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class meta_service : public serverlet<meta_service>
virtual ~meta_service();

error_code start();
void stop();

const replication_options &get_options() const { return _opts; }
const meta_options &get_meta_options() const { return _meta_opts; }
Expand Down
2 changes: 1 addition & 1 deletion src/meta/meta_service_app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ error_code meta_service_app::start(const std::vector<std::string> &args)

error_code meta_service_app::stop(bool /*cleanup*/)
{
_service.reset(nullptr);
_service->stop();
return ERR_OK;
}
} // namespace service
Expand Down
14 changes: 13 additions & 1 deletion src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
_max_concurrent_bulk_load_downloading_count(5),
_learn_app_concurrent_count(0),
_fs_manager(false),
_bulk_load_downloading_count(0)
_bulk_load_downloading_count(0),
_is_running(false)
{
#ifdef DSN_ENABLE_GPERF
_release_tcmalloc_memory_command = nullptr;
Expand Down Expand Up @@ -792,6 +793,10 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f

void replica_stub::initialize_start()
{
if (_is_running) {
return;
}

// start timer for configuration sync
if (!_options.config_sync_disabled) {
_config_sync_timer_task =
Expand Down Expand Up @@ -841,6 +846,8 @@ void replica_stub::initialize_start()
} else {
_state = NS_Connected;
}

_is_running = true;
}

dsn::error_code replica_stub::on_kill_replica(gpid id)
Expand Down Expand Up @@ -2457,6 +2464,10 @@ replica_stub::exec_command_on_replica(const std::vector<std::string> &args,

void replica_stub::close()
{
if (!_is_running) {
return;
}

_tracker.cancel_outstanding_tasks();

// this replica may not be opened
Expand Down Expand Up @@ -2561,6 +2572,7 @@ void replica_stub::close()
_replicas.erase(_replicas.begin());
}
}
_is_running = false;
}

std::string replica_stub::get_replica_dir(const char *app_type, gpid id, bool create_new)
Expand Down
2 changes: 2 additions & 0 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,8 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
// replica count exectuting bulk load downloading concurrently
std::atomic_int _bulk_load_downloading_count;

bool _is_running;

// performance counters
perf_counter_wrapper _counter_replicas_count;
perf_counter_wrapper _counter_replicas_opening_count;
Expand Down
1 change: 1 addition & 0 deletions src/runtime/rpc/rpc_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class rpc_engine
//
::dsn::error_code start(const service_app_spec &spec);
void start_serving() { _is_serving = true; }
void stop_serving() { _is_serving = false; }

//
// rpc registrations
Expand Down
36 changes: 20 additions & 16 deletions src/runtime/service_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "runtime/task/task_engine.h"
#include "runtime/rpc/rpc_engine.h"

#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/smart_pointers.h>
#include <dsn/tool-api/env_provider.h>
Expand Down Expand Up @@ -165,7 +166,12 @@ rpc_request_task *service_node::generate_intercepted_request_task(message_ex *re
return t;
}

service_node::~service_node() = default;
service_node::~service_node()
{
_rpc->stop_serving();
stop_app(false);
_computation->stop();
}

//////////////////////////////////////////////////////////////////////////////////////////

Expand All @@ -188,6 +194,8 @@ service_engine::service_engine()

service_engine::~service_engine()
{
_nodes_by_app_id.clear();

UNREGISTER_VALID_HANDLER(_get_runtime_info_cmd);
UNREGISTER_VALID_HANDLER(_get_queue_info_cmd);
}
Expand Down Expand Up @@ -218,32 +226,28 @@ void service_engine::init_after_toollets()

void service_engine::start_node(service_app_spec &app_spec)
{
std::unordered_map<int, std::string> app_name_by_port;
auto it = _nodes_by_app_id.find(app_spec.id);
if (it == _nodes_by_app_id.end()) {
for (auto p : app_spec.ports) {
// union to existing node if any port is shared
if (_nodes_by_app_port.find(p) != _nodes_by_app_port.end()) {
service_node *n = _nodes_by_app_port[p];

dassert(false,
"network port %d usage confliction for %s vs %s, "
"please reconfig",
p,
n->full_name(),
app_spec.full_name.c_str());
auto it = app_name_by_port.find(p);
if (it != app_name_by_port.end()) {
dassert_f(false,
"network port {} usage confliction for {} vs {}, "
"please reconfig",
p,
it->second,
app_spec.full_name);
}
app_name_by_port.emplace(p, app_spec.full_name);
}

auto node = std::make_shared<service_node>(app_spec);
error_code err = node->start();
dassert(err == ERR_OK, "service node start failed, err = %s", err.to_string());
dassert_f(err == ERR_OK, "service node start failed, err = {}", err.to_string());

_nodes_by_app_id[node->id()] = node;
for (auto p1 : node->spec().ports) {
_nodes_by_app_port[p1] = node.get();
}

return;
}
}

Expand Down
5 changes: 1 addition & 4 deletions src/runtime/service_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,8 @@ class service_engine : public utils::singleton<service_engine>

bool _simulator;

// <port, servicenode>
typedef std::map<int, service_node *>
node_engines_by_port; // multiple ports may share the same node
// map app_id to service_node
service_nodes_by_app_id _nodes_by_app_id;
node_engines_by_port _nodes_by_app_port;
};

// ------------ inline impl ---------------------
Expand Down
14 changes: 12 additions & 2 deletions src/runtime/task/simple_task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ namespace dsn {
namespace tools {

simple_timer_service::simple_timer_service(service_node *node, timer_service *inner_provider)
: timer_service(node, inner_provider)
: timer_service(node, inner_provider), _is_running(false)
{
}

void simple_timer_service::start()
{
if (_is_running) {
return;
}

_worker = std::thread([this]() {
task::set_tls_dsn_context(node(), nullptr);

Expand All @@ -53,12 +57,18 @@ void simple_timer_service::start()
false, "io_service in simple_timer_service run failed: %s", ec.message().data());
}
});
_is_running = true;
}

simple_timer_service::~simple_timer_service()
void simple_timer_service::stop()
{
if (!_is_running) {
return;
}

_ios.stop();
_worker.join();
_is_running = false;
}

void simple_timer_service::add_timer(task *task)
Expand Down
5 changes: 4 additions & 1 deletion src/runtime/task/simple_task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,19 @@ class simple_timer_service : public timer_service
public:
simple_timer_service(service_node *node, timer_service *inner_provider);

~simple_timer_service() override;
~simple_timer_service() override { stop(); }

// after milliseconds, the provider should call task->enqueue()
virtual void add_timer(task *task) override;

virtual void start() override;

virtual void stop() override;

private:
boost::asio::io_service _ios;
std::thread _worker;
bool _is_running;
};

} // namespace tools
Expand Down
57 changes: 46 additions & 11 deletions src/runtime/task/task_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
* THE SOFTWARE.
*/

#include <dsn/dist/fmt_logging.h>

#include "task_engine.h"

using namespace dsn::utils;
Expand Down Expand Up @@ -78,23 +80,42 @@ void task_worker_pool::start()
if (_is_running)
return;

for (auto &tsvc : _per_queue_timer_svcs)
for (auto &tsvc : _per_queue_timer_svcs) {
tsvc->start();
for (auto &wk : _workers)
}
for (auto &wk : _workers) {
wk->start();
}

ddebug("[%s] thread pool [%s] started, pool_code = %s, worker_count = %d, worker_share_core = "
"%s, partitioned = %s, ...",
_node->full_name(),
_spec.name.c_str(),
_spec.pool_code.to_string(),
_spec.worker_count,
_spec.worker_share_core ? "true" : "false",
_spec.partitioned ? "true" : "false");
ddebug_f(
"[{}]: thread pool [{}] started, pool_code = {}, worker_count = {}, worker_share_core = "
"{}, partitioned = {}, ...",
_node->full_name(),
_spec.name,
_spec.pool_code.to_string(),
_spec.worker_count,
_spec.worker_share_core ? "true" : "false",
_spec.partitioned ? "true" : "false");

_is_running = true;
}

void task_worker_pool::stop()
{
if (!_is_running) {
return;
}

for (auto &tsvc : _per_queue_timer_svcs) {
tsvc->stop();
}
for (auto &wk : _workers) {
wk->stop();
}
_is_running = false;
ddebug_f("[{}]: thread pool {} stopped", _node->full_name(), _spec.name);
}

void task_worker_pool::add_timer(task *t)
{
dassert(t->delay_milliseconds() > 0,
Expand Down Expand Up @@ -213,8 +234,22 @@ void task_engine::start()
if (pl)
pl->start();
}

_is_running = true;
ddebug_f("[{}]: task engine started", _node->full_name());
}

void task_engine::stop()
{
if (!_is_running) {
return;
}

for (auto &pl : _pools) {
if (pl)
pl->stop();
}
_is_running = false;
ddebug_f("[{}]: task engine stopped", _node->full_name());
}

volatile int *task_engine::get_task_queue_virtual_length_ptr(dsn::task_code code, int hash)
Expand Down
3 changes: 3 additions & 0 deletions src/runtime/task/task_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class task_worker_pool
// service management
void create();
void start();
void stop();

// task procecessing
void enqueue(task *task);
Expand Down Expand Up @@ -95,12 +96,14 @@ class task_engine
{
public:
task_engine(service_node *node);
~task_engine() { stop(); }

//
// service management routines
//
void create(const std::list<dsn::threadpool_code> &pools);
void start();
void stop();

//
// task management routines
Expand Down
2 changes: 2 additions & 0 deletions src/runtime/task/task_engine.sim.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class sim_timer_service : public timer_service
virtual void add_timer(task *task) override;

virtual void start() override {}

virtual void stop() override {}
};

class sim_task_queue : public task_queue
Expand Down

0 comments on commit 215e99f

Please sign in to comment.