From 7ae22be4c1bf8dd7c5f697cdf2faf7317a02ba14 Mon Sep 17 00:00:00 2001 From: Hua Liu <58683130+liuh-80@users.noreply.github.com> Date: Thu, 19 May 2022 09:36:53 +0800 Subject: [PATCH] Fix SIGTERM can't terminate PubSub::listen issue by add cancellation token support. (#606) Why I did it There are infinite loops inside PubSub::listen() method, so application using this method can't handle SIGTERM correctly. https://github.com/Azure/sonic-swss-common/issues/603 How I did it Add following class: 1. CancellationToken: this class will help exist the infinite loops when SIGTERM or other signal happen. 2. SignalHandlerHelper: Provide a native signal handler. How to verify it 1. manually test. 2. Pass all test case. --- common/Makefile.am | 1 + common/configdb.h | 6 ++- common/pubsub.cpp | 28 ++++++++++---- common/pubsub.h | 4 ++ common/select.cpp | 13 ++++++- common/select.h | 1 + common/signalhandlerhelper.cpp | 70 ++++++++++++++++++++++++++++++++++ common/signalhandlerhelper.h | 36 +++++++++++++++++ pyext/swsscommon.i | 2 + tests/test_signalhandler_ut.py | 33 ++++++++++++++++ 10 files changed, 184 insertions(+), 10 deletions(-) create mode 100644 common/signalhandlerhelper.cpp create mode 100644 common/signalhandlerhelper.h create mode 100755 tests/test_signalhandler_ut.py diff --git a/common/Makefile.am b/common/Makefile.am index 4c400556d..53f8c87ad 100644 --- a/common/Makefile.am +++ b/common/Makefile.am @@ -44,6 +44,7 @@ libswsscommon_la_SOURCES = \ select.cpp \ selectableevent.cpp \ selectabletimer.cpp \ + signalhandlerhelper.cpp \ consumertable.cpp \ consumertablebase.cpp \ consumerstatetable.cpp \ diff --git a/common/configdb.h b/common/configdb.h index d41a49cec..e1f82b454 100644 --- a/common/configdb.h +++ b/common/configdb.h @@ -94,8 +94,12 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native if init_data_handler: init_data_handler(init_callback_data) - while True: + while not SignalHandlerHelper.checkSignal(SIGNAL_INT): item = self.pubsub.listen_message() + if 'type' not in item: + # When timeout or cancelled, item will not contains 'type' + continue + if item['type'] == 'pmessage': key = item['channel'].split(':', 1)[1] try: diff --git a/common/pubsub.cpp b/common/pubsub.cpp index 02f5702f0..0b5fb808c 100644 --- a/common/pubsub.cpp +++ b/common/pubsub.cpp @@ -79,20 +79,29 @@ bool PubSub::hasCachedData() map PubSub::get_message(double timeout) { - map ret; + return get_message_internal(timeout).second; +} + +MessageResultPair PubSub::get_message_internal(double timeout) +{ + MessageResultPair ret; + if (!m_subscribe) { + ret.first = Select::ERROR; return ret; } Selectable *selected; int rc = m_select.select(&selected, int(timeout)); + ret.first = rc; switch (rc) { case Select::ERROR: throw RedisError("Failed to select", m_subscribe->getContext()); case Select::TIMEOUT: + case Select::SIGNALINT: return ret; case Select::OBJECT: @@ -110,10 +119,10 @@ map PubSub::get_message(double timeout) } auto message = event->getReply(); - ret["type"] = message.type; - ret["pattern"] = message.pattern; - ret["channel"] = message.channel; - ret["data"] = message.data; + ret.second["type"] = message.type; + ret.second["pattern"] = message.pattern; + ret.second["channel"] = message.channel; + ret.second["data"] = message.data; return ret; } @@ -122,14 +131,17 @@ map PubSub::get_message(double timeout) std::map PubSub::listen_message() { const double GET_MESSAGE_INTERVAL = 600.0; // in seconds + MessageResultPair ret; for (;;) { - auto ret = get_message(GET_MESSAGE_INTERVAL); - if (!ret.empty()) + ret = get_message_internal(GET_MESSAGE_INTERVAL); + if (!ret.second.empty() || ret.first == Select::SIGNALINT) { - return ret; + break; } } + + return ret.second; } shared_ptr PubSub::popEventBuffer() diff --git a/common/pubsub.h b/common/pubsub.h index 46b414d71..05e8849ad 100644 --- a/common/pubsub.h +++ b/common/pubsub.h @@ -1,6 +1,7 @@ #pragma once #include #include +#include #include "dbconnector.h" #include "select.h" @@ -8,6 +9,8 @@ namespace swss { +typedef std::pair > MessageResultPair; + // This class is to emulate python redis-py class PubSub // After SWIG wrapping, it should be used in the same way class PubSub : protected RedisSelect @@ -29,6 +32,7 @@ class PubSub : protected RedisSelect private: /* Pop keyspace event from event buffer. Caller should free resources. */ std::shared_ptr popEventBuffer(); + MessageResultPair get_message_internal(double timeout = 0.0); DBConnector *m_parentConnector; Select m_select; diff --git a/common/select.cpp b/common/select.cpp index 0a30bf1c7..9150af874 100644 --- a/common/select.cpp +++ b/common/select.cpp @@ -1,6 +1,7 @@ #include "common/selectable.h" #include "common/logger.h" #include "common/select.h" +#include "common/signalhandlerhelper.h" #include #include #include @@ -9,6 +10,7 @@ #include #include + using namespace std; namespace swss { @@ -97,7 +99,13 @@ int Select::poll_descriptors(Selectable **c, unsigned int timeout) { ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout); } - while(ret == -1 && errno == EINTR); // Retry the select if the process was interrupted by a signal + while(ret == -1 && errno == EINTR && !SignalHandlerHelper::checkSignal(Signals::SIGNAL_INT)); // Retry the select if the process was interrupted by a signal + + if (SignalHandlerHelper::checkSignal(Signals::SIGNAL_INT)) + { + // Return if the epoll_wait was interrupted by SIGTERM + return Select::SIGNALINT; + } if (ret < 0) return Select::ERROR; @@ -190,6 +198,9 @@ std::string Select::resultToString(int result) case swss::Select::TIMEOUT: return "TIMEOUT"; + case swss::Select::SIGNALINT: + return "SIGNALINT"; + default: SWSS_LOG_WARN("unknown select result: %d", result); return "UNKNOWN"; diff --git a/common/select.h b/common/select.h index 9728c0c9a..2d41721aa 100644 --- a/common/select.h +++ b/common/select.h @@ -30,6 +30,7 @@ class Select OBJECT = 0, ERROR = 1, TIMEOUT = 2, + SIGNALINT = 3,// Read operation interrupted by SIGINT }; int select(Selectable **c, int timeout = -1); diff --git a/common/signalhandlerhelper.cpp b/common/signalhandlerhelper.cpp new file mode 100644 index 000000000..0f1b0f36c --- /dev/null +++ b/common/signalhandlerhelper.cpp @@ -0,0 +1,70 @@ +#include +#include "common/logger.h" +#include "signalhandlerhelper.h" + +using namespace swss; + +std::map SignalHandlerHelper::m_signalStatusMapping; +std::map SignalHandlerHelper::m_sigActionMapping; + +void SignalHandlerHelper::registerSignalHandler(int signalNumber) +{ + auto result = m_sigActionMapping.find(signalNumber); + if (result != m_sigActionMapping.end()) + { + // signal action already registered + SWSS_LOG_WARN("sigaction for %d already registered.", signalNumber); + return; + } + + m_signalStatusMapping[signalNumber] = false; + + SigActionPair sig_action_pair; + auto *new_action = &sig_action_pair.first; + auto *old_action = &sig_action_pair.second; + + new_action->sa_handler = SignalHandlerHelper::onSignal; + sigemptyset(&new_action->sa_mask); + new_action->sa_flags = 0; + + // always replace old action even old action is ignore signal + sigaction(signalNumber, new_action, old_action); + + m_sigActionMapping[signalNumber] = sig_action_pair; +} + +void SignalHandlerHelper::restoreSignalHandler(int signalNumber) +{ + auto result = m_sigActionMapping.find(signalNumber); + if (result == m_sigActionMapping.end()) + { + // signal action does not registered + SWSS_LOG_WARN("sigaction for %d does not registered.",signalNumber); + return; + } + + auto *old_action = &result->second.second; + + sigaction(signalNumber, old_action, NULL); +} + +void SignalHandlerHelper::onSignal(int signalNumber) +{ + m_signalStatusMapping[signalNumber] = true; +} + +bool SignalHandlerHelper::checkSignal(int signalNumber) +{ + auto result = m_signalStatusMapping.find(signalNumber); + if (result != m_signalStatusMapping.end()) + { + return result->second; + } + + return false; +} + +void SignalHandlerHelper::resetSignal(int signalNumber) +{ + m_signalStatusMapping[signalNumber] = false; +} \ No newline at end of file diff --git a/common/signalhandlerhelper.h b/common/signalhandlerhelper.h new file mode 100644 index 000000000..ee7633aaa --- /dev/null +++ b/common/signalhandlerhelper.h @@ -0,0 +1,36 @@ +#pragma once +#include +#include + +namespace swss { + +typedef std::pair SigActionPair; + +// Define signal ID enum for python +enum Signals +{ + SIGNAL_TERM = SIGTERM, + SIGNAL_INT = SIGINT +}; + +/* + SignalHandlerHelper class provide a native signal handler. + Python signal handler have following issue: + A long-running calculation implemented purely in C (such as regular expression matching on a large body of text) may run uninterrupted for an arbitrary amount of time, regardless of any signals received. The Python signal handlers will be called when the calculation finishes. + For more information, please check: https://docs.python.org/3/library/signal.html +*/ +class SignalHandlerHelper +{ +public: + static void registerSignalHandler(int signalNumber); + static void restoreSignalHandler(int signalNumber); + static void onSignal(int signalNumber); + static bool checkSignal(int signalNumber); + static void resetSignal(int signalNumber); + +private: + static std::map m_signalStatusMapping; + static std::map m_sigActionMapping; +}; + +} \ No newline at end of file diff --git a/pyext/swsscommon.i b/pyext/swsscommon.i index ebf3e65fd..f3b15cd22 100644 --- a/pyext/swsscommon.i +++ b/pyext/swsscommon.i @@ -19,6 +19,7 @@ #include "pubsub.h" #include "select.h" #include "selectable.h" +#include "signalhandlerhelper.h" #include "rediscommand.h" #include "table.h" #include "redispipeline.h" @@ -154,6 +155,7 @@ T castSelectableObj(swss::Selectable *temp) %include "pubsub.h" %include "selectable.h" %include "select.h" +%include "signalhandlerhelper.h" %include "rediscommand.h" %include "redispipeline.h" %include "redisselect.h" diff --git a/tests/test_signalhandler_ut.py b/tests/test_signalhandler_ut.py new file mode 100755 index 000000000..bed883b5d --- /dev/null +++ b/tests/test_signalhandler_ut.py @@ -0,0 +1,33 @@ +import signal +import os +import pytest +from swsscommon import swsscommon +from swsscommon.swsscommon import SignalHandlerHelper + +def dummy_signal_handler(signum, stack): + # ignore signal so UT will not break + pass + +def test_SignalHandler(): + signal.signal(signal.SIGUSR1, dummy_signal_handler) + + # Register SIGUSER1 + SignalHandlerHelper.registerSignalHandler(signal.SIGUSR1) + happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1) + assert happened == False + + # trigger SIGUSER manually + os.kill(os.getpid(), signal.SIGUSR1) + happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1) + assert happened == True + + # Reset signal + SignalHandlerHelper.resetSignal(signal.SIGUSR1) + happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1) + assert happened == False + + # un-register signal handler + SignalHandlerHelper.restoreSignalHandler(signal.SIGUSR1) + os.kill(os.getpid(), signal.SIGUSR1) + happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1) + assert happened == False \ No newline at end of file