Skip to content

Commit

Permalink
Fix SIGTERM can't terminate PubSub::listen issue by add cancellation …
Browse files Browse the repository at this point in the history
…token support. (#606)

Why I did it
    There are infinite loops inside PubSub::listen() method, so application using this method can't handle SIGTERM correctly.
    #603

How I did it
    Add following class:
    1. CancellationToken: this class will help exist the infinite loops when SIGTERM or other signal happen.
    2. SignalHandlerHelper: Provide a native signal handler.

How to verify it
   1. manually test.
   2. Pass all test case.
  • Loading branch information
liuh-80 committed May 19, 2022
1 parent a90b2b7 commit 7ae22be
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 10 deletions.
1 change: 1 addition & 0 deletions common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ libswsscommon_la_SOURCES = \
select.cpp \
selectableevent.cpp \
selectabletimer.cpp \
signalhandlerhelper.cpp \
consumertable.cpp \
consumertablebase.cpp \
consumerstatetable.cpp \
Expand Down
6 changes: 5 additions & 1 deletion common/configdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,12 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native
if init_data_handler:
init_data_handler(init_callback_data)

while True:
while not SignalHandlerHelper.checkSignal(SIGNAL_INT):
item = self.pubsub.listen_message()
if 'type' not in item:
# When timeout or cancelled, item will not contains 'type'
continue

if item['type'] == 'pmessage':
key = item['channel'].split(':', 1)[1]
try:
Expand Down
28 changes: 20 additions & 8 deletions common/pubsub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,29 @@ bool PubSub::hasCachedData()

map<string, string> PubSub::get_message(double timeout)
{
map<string, string> ret;
return get_message_internal(timeout).second;
}

MessageResultPair PubSub::get_message_internal(double timeout)
{
MessageResultPair ret;

if (!m_subscribe)
{
ret.first = Select::ERROR;
return ret;
}

Selectable *selected;
int rc = m_select.select(&selected, int(timeout));
ret.first = rc;
switch (rc)
{
case Select::ERROR:
throw RedisError("Failed to select", m_subscribe->getContext());

case Select::TIMEOUT:
case Select::SIGNALINT:
return ret;

case Select::OBJECT:
Expand All @@ -110,10 +119,10 @@ map<string, string> PubSub::get_message(double timeout)
}

auto message = event->getReply<RedisMessage>();
ret["type"] = message.type;
ret["pattern"] = message.pattern;
ret["channel"] = message.channel;
ret["data"] = message.data;
ret.second["type"] = message.type;
ret.second["pattern"] = message.pattern;
ret.second["channel"] = message.channel;
ret.second["data"] = message.data;
return ret;
}

Expand All @@ -122,14 +131,17 @@ map<string, string> PubSub::get_message(double timeout)
std::map<std::string, std::string> PubSub::listen_message()
{
const double GET_MESSAGE_INTERVAL = 600.0; // in seconds
MessageResultPair ret;
for (;;)
{
auto ret = get_message(GET_MESSAGE_INTERVAL);
if (!ret.empty())
ret = get_message_internal(GET_MESSAGE_INTERVAL);
if (!ret.second.empty() || ret.first == Select::SIGNALINT)
{
return ret;
break;
}
}

return ret.second;
}

shared_ptr<RedisReply> PubSub::popEventBuffer()
Expand Down
4 changes: 4 additions & 0 deletions common/pubsub.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#pragma once
#include <map>
#include <deque>
#include <utility>

#include "dbconnector.h"
#include "select.h"
#include "redisselect.h"

namespace swss {

typedef std::pair<int, std::map<std::string, std::string> > MessageResultPair;

// This class is to emulate python redis-py class PubSub
// After SWIG wrapping, it should be used in the same way
class PubSub : protected RedisSelect
Expand All @@ -29,6 +32,7 @@ class PubSub : protected RedisSelect
private:
/* Pop keyspace event from event buffer. Caller should free resources. */
std::shared_ptr<RedisReply> popEventBuffer();
MessageResultPair get_message_internal(double timeout = 0.0);

DBConnector *m_parentConnector;
Select m_select;
Expand Down
13 changes: 12 additions & 1 deletion common/select.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "common/selectable.h"
#include "common/logger.h"
#include "common/select.h"
#include "common/signalhandlerhelper.h"
#include <algorithm>
#include <stdio.h>
#include <sys/time.h>
Expand All @@ -9,6 +10,7 @@
#include <unistd.h>
#include <string.h>


using namespace std;

namespace swss {
Expand Down Expand Up @@ -97,7 +99,13 @@ int Select::poll_descriptors(Selectable **c, unsigned int timeout)
{
ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout);
}
while(ret == -1 && errno == EINTR); // Retry the select if the process was interrupted by a signal
while(ret == -1 && errno == EINTR && !SignalHandlerHelper::checkSignal(Signals::SIGNAL_INT)); // Retry the select if the process was interrupted by a signal

if (SignalHandlerHelper::checkSignal(Signals::SIGNAL_INT))
{
// Return if the epoll_wait was interrupted by SIGTERM
return Select::SIGNALINT;
}

if (ret < 0)
return Select::ERROR;
Expand Down Expand Up @@ -190,6 +198,9 @@ std::string Select::resultToString(int result)
case swss::Select::TIMEOUT:
return "TIMEOUT";

case swss::Select::SIGNALINT:
return "SIGNALINT";

default:
SWSS_LOG_WARN("unknown select result: %d", result);
return "UNKNOWN";
Expand Down
1 change: 1 addition & 0 deletions common/select.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class Select
OBJECT = 0,
ERROR = 1,
TIMEOUT = 2,
SIGNALINT = 3,// Read operation interrupted by SIGINT
};

int select(Selectable **c, int timeout = -1);
Expand Down
70 changes: 70 additions & 0 deletions common/signalhandlerhelper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#include <signal.h>
#include "common/logger.h"
#include "signalhandlerhelper.h"

using namespace swss;

std::map<int, bool> SignalHandlerHelper::m_signalStatusMapping;
std::map<int, SigActionPair> SignalHandlerHelper::m_sigActionMapping;

void SignalHandlerHelper::registerSignalHandler(int signalNumber)
{
auto result = m_sigActionMapping.find(signalNumber);
if (result != m_sigActionMapping.end())
{
// signal action already registered
SWSS_LOG_WARN("sigaction for %d already registered.", signalNumber);
return;
}

m_signalStatusMapping[signalNumber] = false;

SigActionPair sig_action_pair;
auto *new_action = &sig_action_pair.first;
auto *old_action = &sig_action_pair.second;

new_action->sa_handler = SignalHandlerHelper::onSignal;
sigemptyset(&new_action->sa_mask);
new_action->sa_flags = 0;

// always replace old action even old action is ignore signal
sigaction(signalNumber, new_action, old_action);

m_sigActionMapping[signalNumber] = sig_action_pair;
}

void SignalHandlerHelper::restoreSignalHandler(int signalNumber)
{
auto result = m_sigActionMapping.find(signalNumber);
if (result == m_sigActionMapping.end())
{
// signal action does not registered
SWSS_LOG_WARN("sigaction for %d does not registered.",signalNumber);
return;
}

auto *old_action = &result->second.second;

sigaction(signalNumber, old_action, NULL);
}

void SignalHandlerHelper::onSignal(int signalNumber)
{
m_signalStatusMapping[signalNumber] = true;
}

bool SignalHandlerHelper::checkSignal(int signalNumber)
{
auto result = m_signalStatusMapping.find(signalNumber);
if (result != m_signalStatusMapping.end())
{
return result->second;
}

return false;
}

void SignalHandlerHelper::resetSignal(int signalNumber)
{
m_signalStatusMapping[signalNumber] = false;
}
36 changes: 36 additions & 0 deletions common/signalhandlerhelper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once
#include <map>
#include <signal.h>

namespace swss {

typedef std::pair<struct sigaction, struct sigaction> SigActionPair;

// Define signal ID enum for python
enum Signals
{
SIGNAL_TERM = SIGTERM,
SIGNAL_INT = SIGINT
};

/*
SignalHandlerHelper class provide a native signal handler.
Python signal handler have following issue:
A long-running calculation implemented purely in C (such as regular expression matching on a large body of text) may run uninterrupted for an arbitrary amount of time, regardless of any signals received. The Python signal handlers will be called when the calculation finishes.
For more information, please check: https://docs.python.org/3/library/signal.html
*/
class SignalHandlerHelper
{
public:
static void registerSignalHandler(int signalNumber);
static void restoreSignalHandler(int signalNumber);
static void onSignal(int signalNumber);
static bool checkSignal(int signalNumber);
static void resetSignal(int signalNumber);

private:
static std::map<int, bool> m_signalStatusMapping;
static std::map<int, SigActionPair> m_sigActionMapping;
};

}
2 changes: 2 additions & 0 deletions pyext/swsscommon.i
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "pubsub.h"
#include "select.h"
#include "selectable.h"
#include "signalhandlerhelper.h"
#include "rediscommand.h"
#include "table.h"
#include "redispipeline.h"
Expand Down Expand Up @@ -154,6 +155,7 @@ T castSelectableObj(swss::Selectable *temp)
%include "pubsub.h"
%include "selectable.h"
%include "select.h"
%include "signalhandlerhelper.h"
%include "rediscommand.h"
%include "redispipeline.h"
%include "redisselect.h"
Expand Down
33 changes: 33 additions & 0 deletions tests/test_signalhandler_ut.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import signal
import os
import pytest
from swsscommon import swsscommon
from swsscommon.swsscommon import SignalHandlerHelper

def dummy_signal_handler(signum, stack):
# ignore signal so UT will not break
pass

def test_SignalHandler():
signal.signal(signal.SIGUSR1, dummy_signal_handler)

# Register SIGUSER1
SignalHandlerHelper.registerSignalHandler(signal.SIGUSR1)
happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1)
assert happened == False

# trigger SIGUSER manually
os.kill(os.getpid(), signal.SIGUSR1)
happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1)
assert happened == True

# Reset signal
SignalHandlerHelper.resetSignal(signal.SIGUSR1)
happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1)
assert happened == False

# un-register signal handler
SignalHandlerHelper.restoreSignalHandler(signal.SIGUSR1)
os.kill(os.getpid(), signal.SIGUSR1)
happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1)
assert happened == False

0 comments on commit 7ae22be

Please sign in to comment.