diff --git a/include/dsn/tool-api/timer_service.h b/include/dsn/tool-api/timer_service.h index 47e88ca718..d2866cd007 100644 --- a/include/dsn/tool-api/timer_service.h +++ b/include/dsn/tool-api/timer_service.h @@ -64,6 +64,7 @@ class timer_service virtual ~timer_service() = default; virtual void start() = 0; + virtual void stop() = 0; // after milliseconds, the provider should call task->enqueue() virtual void add_timer(task *task) = 0; diff --git a/src/aio/test/main.cpp b/src/aio/test/main.cpp index 67d12167e2..037952ce46 100644 --- a/src/aio/test/main.cpp +++ b/src/aio/test/main.cpp @@ -9,5 +9,9 @@ GTEST_API_ int main(int argc, char **argv) { testing::InitGoogleTest(&argc, argv); dsn_run_config("config.ini", false); - return RUN_ALL_TESTS(); + int g_test_ret = RUN_ALL_TESTS(); +#ifndef ENABLE_GCOV + dsn_exit(g_test_ret); +#endif + return g_test_ret; } diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp index 7c8649fd28..ac7a034e6c 100644 --- a/src/meta/meta_service.cpp +++ b/src/meta/meta_service.cpp @@ -81,10 +81,17 @@ meta_service::meta_service() _access_controller = security::create_meta_access_controller(); } -meta_service::~meta_service() +meta_service::~meta_service() { stop(); } + +void meta_service::stop() { + zauto_write_lock l(_meta_lock); + if (!_started.load()) { + return; + } _tracker.cancel_outstanding_tasks(); unregister_ctrl_commands(); + _started = false; } bool meta_service::check_freeze() const diff --git a/src/meta/meta_service.h b/src/meta/meta_service.h index e26268b19e..a7618c169e 100644 --- a/src/meta/meta_service.h +++ b/src/meta/meta_service.h @@ -73,6 +73,7 @@ class meta_service : public serverlet virtual ~meta_service(); error_code start(); + void stop(); const replication_options &get_options() const { return _opts; } const meta_options &get_meta_options() const { return _meta_opts; } diff --git a/src/meta/meta_service_app.cpp b/src/meta/meta_service_app.cpp index d612e005d1..3abcfcc97a 100644 --- a/src/meta/meta_service_app.cpp +++ b/src/meta/meta_service_app.cpp @@ -104,7 +104,7 @@ error_code meta_service_app::start(const std::vector &args) error_code meta_service_app::stop(bool /*cleanup*/) { - _service.reset(nullptr); + _service->stop(); return ERR_OK; } } // namespace service diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 0f8e5e2d30..7a526c8cf7 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -85,7 +85,8 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/, _max_concurrent_bulk_load_downloading_count(5), _learn_app_concurrent_count(0), _fs_manager(false), - _bulk_load_downloading_count(0) + _bulk_load_downloading_count(0), + _is_running(false) { #ifdef DSN_ENABLE_GPERF _release_tcmalloc_memory_command = nullptr; @@ -792,6 +793,10 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f void replica_stub::initialize_start() { + if (_is_running) { + return; + } + // start timer for configuration sync if (!_options.config_sync_disabled) { _config_sync_timer_task = @@ -841,6 +846,8 @@ void replica_stub::initialize_start() } else { _state = NS_Connected; } + + _is_running = true; } dsn::error_code replica_stub::on_kill_replica(gpid id) @@ -2457,6 +2464,10 @@ replica_stub::exec_command_on_replica(const std::vector &args, void replica_stub::close() { + if (!_is_running) { + return; + } + _tracker.cancel_outstanding_tasks(); // this replica may not be opened @@ -2561,6 +2572,7 @@ void replica_stub::close() _replicas.erase(_replicas.begin()); } } + _is_running = false; } std::string replica_stub::get_replica_dir(const char *app_type, gpid id, bool create_new) diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h index 060a335e00..bd6afe3cd9 100644 --- a/src/replica/replica_stub.h +++ b/src/replica/replica_stub.h @@ -396,6 +396,8 @@ class replica_stub : public serverlet, public ref_counter // replica count exectuting bulk load downloading concurrently std::atomic_int _bulk_load_downloading_count; + bool _is_running; + // performance counters perf_counter_wrapper _counter_replicas_count; perf_counter_wrapper _counter_replicas_opening_count; diff --git a/src/runtime/rpc/rpc_engine.h b/src/runtime/rpc/rpc_engine.h index 3e33fe23f6..1a3dce947a 100644 --- a/src/runtime/rpc/rpc_engine.h +++ b/src/runtime/rpc/rpc_engine.h @@ -137,6 +137,7 @@ class rpc_engine // ::dsn::error_code start(const service_app_spec &spec); void start_serving() { _is_serving = true; } + void stop_serving() { _is_serving = false; } // // rpc registrations diff --git a/src/runtime/service_engine.cpp b/src/runtime/service_engine.cpp index bb4e73b137..dc7eb315a0 100644 --- a/src/runtime/service_engine.cpp +++ b/src/runtime/service_engine.cpp @@ -28,6 +28,7 @@ #include "runtime/task/task_engine.h" #include "runtime/rpc/rpc_engine.h" +#include #include #include #include @@ -165,7 +166,12 @@ rpc_request_task *service_node::generate_intercepted_request_task(message_ex *re return t; } -service_node::~service_node() = default; +service_node::~service_node() +{ + _rpc->stop_serving(); + stop_app(false); + _computation->stop(); +} ////////////////////////////////////////////////////////////////////////////////////////// @@ -188,6 +194,8 @@ service_engine::service_engine() service_engine::~service_engine() { + _nodes_by_app_id.clear(); + UNREGISTER_VALID_HANDLER(_get_runtime_info_cmd); UNREGISTER_VALID_HANDLER(_get_queue_info_cmd); } @@ -218,32 +226,28 @@ void service_engine::init_after_toollets() void service_engine::start_node(service_app_spec &app_spec) { + std::unordered_map app_name_by_port; auto it = _nodes_by_app_id.find(app_spec.id); if (it == _nodes_by_app_id.end()) { for (auto p : app_spec.ports) { // union to existing node if any port is shared - if (_nodes_by_app_port.find(p) != _nodes_by_app_port.end()) { - service_node *n = _nodes_by_app_port[p]; - - dassert(false, - "network port %d usage confliction for %s vs %s, " - "please reconfig", - p, - n->full_name(), - app_spec.full_name.c_str()); + auto it = app_name_by_port.find(p); + if (it != app_name_by_port.end()) { + dassert_f(false, + "network port {} usage confliction for {} vs {}, " + "please reconfig", + p, + it->second, + app_spec.full_name); } + app_name_by_port.emplace(p, app_spec.full_name); } auto node = std::make_shared(app_spec); error_code err = node->start(); - dassert(err == ERR_OK, "service node start failed, err = %s", err.to_string()); + dassert_f(err == ERR_OK, "service node start failed, err = {}", err.to_string()); _nodes_by_app_id[node->id()] = node; - for (auto p1 : node->spec().ports) { - _nodes_by_app_port[p1] = node.get(); - } - - return; } } diff --git a/src/runtime/service_engine.h b/src/runtime/service_engine.h index b16660062f..190e20a6c6 100644 --- a/src/runtime/service_engine.h +++ b/src/runtime/service_engine.h @@ -135,11 +135,8 @@ class service_engine : public utils::singleton bool _simulator; - // - typedef std::map - node_engines_by_port; // multiple ports may share the same node + // map app_id to service_node service_nodes_by_app_id _nodes_by_app_id; - node_engines_by_port _nodes_by_app_port; }; // ------------ inline impl --------------------- diff --git a/src/runtime/task/simple_task_queue.cpp b/src/runtime/task/simple_task_queue.cpp index e84bd4842b..0c05cc46d6 100644 --- a/src/runtime/task/simple_task_queue.cpp +++ b/src/runtime/task/simple_task_queue.cpp @@ -30,12 +30,16 @@ namespace dsn { namespace tools { simple_timer_service::simple_timer_service(service_node *node, timer_service *inner_provider) - : timer_service(node, inner_provider) + : timer_service(node, inner_provider), _is_running(false) { } void simple_timer_service::start() { + if (_is_running) { + return; + } + _worker = std::thread([this]() { task::set_tls_dsn_context(node(), nullptr); @@ -53,12 +57,18 @@ void simple_timer_service::start() false, "io_service in simple_timer_service run failed: %s", ec.message().data()); } }); + _is_running = true; } -simple_timer_service::~simple_timer_service() +void simple_timer_service::stop() { + if (!_is_running) { + return; + } + _ios.stop(); _worker.join(); + _is_running = false; } void simple_timer_service::add_timer(task *task) diff --git a/src/runtime/task/simple_task_queue.h b/src/runtime/task/simple_task_queue.h index 7a8365b4d8..94e15421d0 100644 --- a/src/runtime/task/simple_task_queue.h +++ b/src/runtime/task/simple_task_queue.h @@ -52,16 +52,19 @@ class simple_timer_service : public timer_service public: simple_timer_service(service_node *node, timer_service *inner_provider); - ~simple_timer_service() override; + ~simple_timer_service() override { stop(); } // after milliseconds, the provider should call task->enqueue() virtual void add_timer(task *task) override; virtual void start() override; + virtual void stop() override; + private: boost::asio::io_service _ios; std::thread _worker; + bool _is_running; }; } // namespace tools diff --git a/src/runtime/task/task_engine.cpp b/src/runtime/task/task_engine.cpp index 0bb6bdeb09..69095e6589 100644 --- a/src/runtime/task/task_engine.cpp +++ b/src/runtime/task/task_engine.cpp @@ -24,6 +24,8 @@ * THE SOFTWARE. */ +#include + #include "task_engine.h" using namespace dsn::utils; @@ -78,23 +80,42 @@ void task_worker_pool::start() if (_is_running) return; - for (auto &tsvc : _per_queue_timer_svcs) + for (auto &tsvc : _per_queue_timer_svcs) { tsvc->start(); - for (auto &wk : _workers) + } + for (auto &wk : _workers) { wk->start(); + } - ddebug("[%s] thread pool [%s] started, pool_code = %s, worker_count = %d, worker_share_core = " - "%s, partitioned = %s, ...", - _node->full_name(), - _spec.name.c_str(), - _spec.pool_code.to_string(), - _spec.worker_count, - _spec.worker_share_core ? "true" : "false", - _spec.partitioned ? "true" : "false"); + ddebug_f( + "[{}]: thread pool [{}] started, pool_code = {}, worker_count = {}, worker_share_core = " + "{}, partitioned = {}, ...", + _node->full_name(), + _spec.name, + _spec.pool_code.to_string(), + _spec.worker_count, + _spec.worker_share_core ? "true" : "false", + _spec.partitioned ? "true" : "false"); _is_running = true; } +void task_worker_pool::stop() +{ + if (!_is_running) { + return; + } + + for (auto &tsvc : _per_queue_timer_svcs) { + tsvc->stop(); + } + for (auto &wk : _workers) { + wk->stop(); + } + _is_running = false; + ddebug_f("[{}]: thread pool {} stopped", _node->full_name(), _spec.name); +} + void task_worker_pool::add_timer(task *t) { dassert(t->delay_milliseconds() > 0, @@ -213,8 +234,22 @@ void task_engine::start() if (pl) pl->start(); } - _is_running = true; + ddebug_f("[{}]: task engine started", _node->full_name()); +} + +void task_engine::stop() +{ + if (!_is_running) { + return; + } + + for (auto &pl : _pools) { + if (pl) + pl->stop(); + } + _is_running = false; + ddebug_f("[{}]: task engine stopped", _node->full_name()); } volatile int *task_engine::get_task_queue_virtual_length_ptr(dsn::task_code code, int hash) diff --git a/src/runtime/task/task_engine.h b/src/runtime/task/task_engine.h index 2873a56b69..8c2eb17c87 100644 --- a/src/runtime/task/task_engine.h +++ b/src/runtime/task/task_engine.h @@ -58,6 +58,7 @@ class task_worker_pool // service management void create(); void start(); + void stop(); // task procecessing void enqueue(task *task); @@ -95,12 +96,14 @@ class task_engine { public: task_engine(service_node *node); + ~task_engine() { stop(); } // // service management routines // void create(const std::list &pools); void start(); + void stop(); // // task management routines diff --git a/src/runtime/task/task_engine.sim.h b/src/runtime/task/task_engine.sim.h index 1e17d19612..4dd54e0197 100644 --- a/src/runtime/task/task_engine.sim.h +++ b/src/runtime/task/task_engine.sim.h @@ -55,6 +55,8 @@ class sim_timer_service : public timer_service virtual void add_timer(task *task) override; virtual void start() override {} + + virtual void stop() override {} }; class sim_task_queue : public task_queue