From 9ed3b355fbc95fcc818f842e5b76fd930136f6c7 Mon Sep 17 00:00:00 2001 From: bestwoody Date: Wed, 16 Mar 2022 02:05:55 +0800 Subject: [PATCH 1/6] 1.add metrics of calldata&mpptunnel 2.refine shutdown logic Signed-off-by: bestwoody --- dbms/src/Common/TiFlashMetrics.h | 3 +++ dbms/src/Flash/EstablishCall.cpp | 27 ++++++++++++++++++++++++--- dbms/src/Flash/EstablishCall.h | 4 ++++ dbms/src/Flash/Mpp/MPPTunnel.cpp | 7 +++++++ dbms/src/Flash/Mpp/MPPTunnel.h | 7 +++++++ dbms/src/Server/Server.cpp | 12 ++++++------ 6 files changed, 51 insertions(+), 9 deletions(-) diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 682d9c93470..9a57e2b1c5c 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -169,6 +169,9 @@ namespace DB F(type_decode, {{"type", "decode"}}, ExpBuckets{0.0005, 2, 20}), F(type_write, {{"type", "write"}}, ExpBuckets{0.0005, 2, 20})) \ M(tiflash_server_info, "Indicate the tiflash server info, and the value is the start timestamp (s).", Gauge, \ F(start_time, {"version", TiFlashBuildInfo::getReleaseVersion()}, {"hash", TiFlashBuildInfo::getGitHash()})) \ + M(tiflash_object_count, "Number of objects", Gauge, \ + F(type_count_of_establish_calldata, {"type", "count_of_establish_calldata"}), \ + F(type_count_of_mpptunnel, {"type", "count_of_mpptunnel"})) \ M(tiflash_thread_count, "Number of threads", Gauge, \ F(type_max_threads_of_thdpool, {"type", "thread_pool_total_max"}), \ F(type_active_threads_of_thdpool, {"type", "thread_pool_active"}), \ diff --git a/dbms/src/Flash/EstablishCall.cpp b/dbms/src/Flash/EstablishCall.cpp index 04080530d58..e751b447343 100644 --- a/dbms/src/Flash/EstablishCall.cpp +++ b/dbms/src/Flash/EstablishCall.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -26,12 +27,20 @@ EstablishCallData::EstablishCallData(AsyncFlashService * service, grpc::ServerCo , responder(&ctx) , state(NEW_REQUEST) { + GET_METRIC(tiflash_object_count, type_count_of_establish_calldata).Increment(); // As part of the initial CREATE state, we *request* that the system // start processing requests. In this request, "this" acts are // the tag uniquely identifying the request. service->RequestEstablishMPPConnection(&ctx, &request, &responder, cq, notify_cq, this); } +EstablishCallData::~EstablishCallData() +{ + SCOPE_EXIT({ + GET_METRIC(tiflash_object_count, type_count_of_establish_calldata).Decrement(); + }); +} + EstablishCallData * EstablishCallData::spawn(AsyncFlashService * service, grpc::ServerCompletionQueue * cq, grpc::ServerCompletionQueue * notify_cq, const std::shared_ptr> & is_shutdown) { return new EstablishCallData(service, cq, notify_cq, is_shutdown); @@ -53,7 +62,9 @@ void EstablishCallData::tryFlushOne() void EstablishCallData::responderFinish(const grpc::Status & status) { - if (!(*is_shutdown)) + if ((*is_shutdown)) + finishTunnelAndResponder(); + else responder.Finish(status, this); } @@ -79,7 +90,10 @@ void EstablishCallData::initRpc() bool EstablishCallData::write(const mpp::MPPDataPacket & packet) { if (*is_shutdown) - return false; + { + finishTunnelAndResponder(); + return true; + } responder.Write(packet, this); return true; } @@ -116,11 +130,18 @@ void EstablishCallData::cancel() delete this; return; } + finishTunnelAndResponder(); +} + +void EstablishCallData::finishTunnelAndResponder() +{ state = FINISH; if (mpp_tunnel) + { mpp_tunnel->consumerFinish("grpc writes failed.", true); //trigger mpp tunnel finish work + } grpc::Status status(static_cast(GRPC_STATUS_UNKNOWN), "Consumer exits unexpected, grpc writes failed."); - responderFinish(status); + responder.Finish(status, this); } void EstablishCallData::proceed() diff --git a/dbms/src/Flash/EstablishCall.h b/dbms/src/Flash/EstablishCall.h index 8aeacf84c39..3b81b9da6c1 100644 --- a/dbms/src/Flash/EstablishCall.h +++ b/dbms/src/Flash/EstablishCall.h @@ -51,6 +51,8 @@ class EstablishCallData : public PacketWriter grpc::ServerCompletionQueue * notify_cq, const std::shared_ptr> & is_shutdown); + ~EstablishCallData(); + bool write(const mpp::MPPDataPacket & packet) override; void tryFlushOne() override; @@ -79,6 +81,8 @@ class EstablishCallData : public PacketWriter void initRpc(); + void finishTunnelAndResponder(); + void responderFinish(const grpc::Status & status); std::mutex mu; diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index ac8cb33e0fe..12bfd31180c 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -48,12 +48,16 @@ MPPTunnelBase::MPPTunnelBase( , thread_manager(newThreadManager()) , log(getMPPTaskLog(log_, tunnel_id)) { + GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Increment(); assert(!(is_local && is_async)); } template MPPTunnelBase::~MPPTunnelBase() { + SCOPE_EXIT({ + GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Decrement(); + }); try { { @@ -308,6 +312,9 @@ void MPPTunnelBase::consumerFinish(const String & err_msg, bool need_loc send_queue.finish(); auto rest_work = [this, &err_msg] { + // it's safe to call it multiple times + if (finished && consumer_state.errHasSet()) + return; finished = true; // must call setError in the critical area to keep consistent with `finished` from outside. consumer_state.setError(err_msg); diff --git a/dbms/src/Flash/Mpp/MPPTunnel.h b/dbms/src/Flash/Mpp/MPPTunnel.h index 0b568cdb385..434c75e8093 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.h +++ b/dbms/src/Flash/Mpp/MPPTunnel.h @@ -173,11 +173,18 @@ class MPPTunnelBase : private boost::noncopyable void setError(const String & err_msg) { promise.set_value(err_msg); + err_has_set = true; + } + + bool errHasSet() const + { + return err_has_set; } private: std::promise promise; std::shared_future future; + bool err_has_set = false; }; ConsumerState consumer_state; diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 044bf701c2a..14fb681b4c3 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -642,14 +642,14 @@ class Server::FlashGrpcServerHolder ~FlashGrpcServerHolder() { - *is_shutdown = true; - const int wait_calldata_after_shutdown_interval_ms = 500; - std::this_thread::sleep_for(std::chrono::milliseconds(wait_calldata_after_shutdown_interval_ms)); // sleep 500ms to let operations of calldata called by MPPTunnel done. /// Shut down grpc server. - // wait 5 seconds for pending rpcs to gracefully stop - gpr_timespec deadline{5, 0, GPR_TIMESPAN}; LOG_FMT_INFO(log, "Begin to shut down flash grpc server"); - flash_grpc_server->Shutdown(deadline); + flash_grpc_server->Shutdown(); + *is_shutdown = true; + // Wait all existed MPPTunnels done to prevent crash. + // If all existed MPPTunnels are done, almost in all cases it means all existed MPPTasks and ExchangeReceivers are also done. + while (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1) + usleep(1000000); // sleep 1s for (auto & cq : cqs) cq->Shutdown(); for (auto & cq : notify_cqs) From c7837f9fdb535a306534f9becbc4bf652f907738 Mon Sep 17 00:00:00 2001 From: bestwoody Date: Wed, 16 Mar 2022 02:28:43 +0800 Subject: [PATCH 2/6] update --- dbms/src/Flash/Mpp/MPPTunnel.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTunnel.h b/dbms/src/Flash/Mpp/MPPTunnel.h index 434c75e8093..cdf4b382e09 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.h +++ b/dbms/src/Flash/Mpp/MPPTunnel.h @@ -178,13 +178,13 @@ class MPPTunnelBase : private boost::noncopyable bool errHasSet() const { - return err_has_set; + return err_has_set.load(); } private: std::promise promise; std::shared_future future; - bool err_has_set = false; + std::atomic err_has_set{false}; }; ConsumerState consumer_state; From 9213ab56345235af512ba84cea933dfd9ba8d43b Mon Sep 17 00:00:00 2001 From: bestwoody <89765764+bestwoody@users.noreply.github.com> Date: Wed, 16 Mar 2022 10:51:15 +0800 Subject: [PATCH 3/6] Apply suggestions from code review Co-authored-by: Fu Zhe --- dbms/src/Flash/EstablishCall.cpp | 2 +- dbms/src/Server/Server.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/EstablishCall.cpp b/dbms/src/Flash/EstablishCall.cpp index e751b447343..0189cbb43ef 100644 --- a/dbms/src/Flash/EstablishCall.cpp +++ b/dbms/src/Flash/EstablishCall.cpp @@ -62,7 +62,7 @@ void EstablishCallData::tryFlushOne() void EstablishCallData::responderFinish(const grpc::Status & status) { - if ((*is_shutdown)) + if (*is_shutdown) finishTunnelAndResponder(); else responder.Finish(status, this); diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 14fb681b4c3..93a8b242fd8 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -649,7 +649,7 @@ class Server::FlashGrpcServerHolder // Wait all existed MPPTunnels done to prevent crash. // If all existed MPPTunnels are done, almost in all cases it means all existed MPPTasks and ExchangeReceivers are also done. while (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1) - usleep(1000000); // sleep 1s + std::this_thread::sleep_for(std::chrono::seconds(1)); for (auto & cq : cqs) cq->Shutdown(); for (auto & cq : notify_cqs) From 8e6913633a9af8bf89b3266d073ff40a2f46fb62 Mon Sep 17 00:00:00 2001 From: bestwoody <89765764+bestwoody@users.noreply.github.com> Date: Wed, 16 Mar 2022 10:56:41 +0800 Subject: [PATCH 4/6] Update dbms/src/Flash/EstablishCall.cpp Co-authored-by: Fu Zhe --- dbms/src/Flash/EstablishCall.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dbms/src/Flash/EstablishCall.cpp b/dbms/src/Flash/EstablishCall.cpp index 0189cbb43ef..cf9d2e2ed6f 100644 --- a/dbms/src/Flash/EstablishCall.cpp +++ b/dbms/src/Flash/EstablishCall.cpp @@ -36,9 +36,7 @@ EstablishCallData::EstablishCallData(AsyncFlashService * service, grpc::ServerCo EstablishCallData::~EstablishCallData() { - SCOPE_EXIT({ - GET_METRIC(tiflash_object_count, type_count_of_establish_calldata).Decrement(); - }); + GET_METRIC(tiflash_object_count, type_count_of_establish_calldata).Decrement(); } EstablishCallData * EstablishCallData::spawn(AsyncFlashService * service, grpc::ServerCompletionQueue * cq, grpc::ServerCompletionQueue * notify_cq, const std::shared_ptr> & is_shutdown) From 65fd6c21216fd8a65ba457c835d833bc5f2b00b5 Mon Sep 17 00:00:00 2001 From: bestwoody Date: Wed, 16 Mar 2022 12:14:56 +0800 Subject: [PATCH 5/6] add harm limit to wait Signed-off-by: bestwoody --- dbms/src/Server/Server.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 14fb681b4c3..6e1c3dba0ce 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -648,8 +648,11 @@ class Server::FlashGrpcServerHolder *is_shutdown = true; // Wait all existed MPPTunnels done to prevent crash. // If all existed MPPTunnels are done, almost in all cases it means all existed MPPTasks and ExchangeReceivers are also done. - while (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1) + const int max_wait_cnt = 300; + int wait_cnt = 0; + while (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1 && (wait_cnt++ < max_wait_cnt)) usleep(1000000); // sleep 1s + for (auto & cq : cqs) cq->Shutdown(); for (auto & cq : notify_cqs) From ad3601d902382c0b0ed67dc99d438cedb6b26cb4 Mon Sep 17 00:00:00 2001 From: bestwoody Date: Wed, 16 Mar 2022 12:43:08 +0800 Subject: [PATCH 6/6] fix Signed-off-by: bestwoody --- dbms/src/Flash/Mpp/MPPTunnel.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 12bfd31180c..99c7b296355 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -48,8 +48,8 @@ MPPTunnelBase::MPPTunnelBase( , thread_manager(newThreadManager()) , log(getMPPTaskLog(log_, tunnel_id)) { - GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Increment(); assert(!(is_local && is_async)); + GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Increment(); } template