Skip to content

Commit

Permalink
Refactor Selectables. Add support for selectable priorities (#192)
Browse files Browse the repository at this point in the history
* Refactor select and selectable. Now they support priority

* Remove fd parameter from Select.select(). This parameter was never used

* Use references as loop variables

* Make select fair for selectables.

If we have selectables with equal priority,
select will choose a selectable which was chosen before
all others. So one selectable can't block others

This commit also contains typo fix

* Revert "Use references as loop variables"

This reverts commit bac891b.

* Revert performance changes

* Address comments

* Make the comparator comment clearer

* Use right EINTR error code for netlink function

* Move Selectable comparator to the Select class as a private functional object, to avoid it using by others

* Hide m_last_used_time from all classes except Select

* Rename some class members

* Make more consistent naming for LastUsedTime of selectable
  • Loading branch information
pavel-shirshov authored and lguohan committed Apr 16, 2018
1 parent 947c86d commit 1ef337a
Show file tree
Hide file tree
Showing 33 changed files with 673 additions and 360 deletions.
4 changes: 2 additions & 2 deletions common/consumerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

namespace swss {

ConsumerStateTable::ConsumerStateTable(DBConnector *db, std::string tableName, int popBatchSize)
: ConsumerTableBase(db, tableName, popBatchSize)
ConsumerStateTable::ConsumerStateTable(DBConnector *db, std::string tableName, int popBatchSize, int pri)
: ConsumerTableBase(db, tableName, popBatchSize, pri)
, TableName_KeySet(tableName)
{
for (;;)
Expand Down
2 changes: 1 addition & 1 deletion common/consumerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace swss {
class ConsumerStateTable : public ConsumerTableBase, public TableName_KeySet
{
public:
ConsumerStateTable(DBConnector *db, std::string tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE);
ConsumerStateTable(DBConnector *db, std::string tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0);

/* Get multiple pop elements */
void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, std::string prefix = EMPTY_PREFIX);
Expand Down
4 changes: 2 additions & 2 deletions common/consumertable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ using namespace std;

namespace swss {

ConsumerTable::ConsumerTable(DBConnector *db, string tableName, int popBatchSize)
: ConsumerTableBase(db, tableName, popBatchSize)
ConsumerTable::ConsumerTable(DBConnector *db, string tableName, int popBatchSize, int pri)
: ConsumerTableBase(db, tableName, popBatchSize, pri)
, TableName_KeyValueOpQueues(tableName)
{
for (;;)
Expand Down
2 changes: 1 addition & 1 deletion common/consumertable.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace swss {
class ConsumerTable : public ConsumerTableBase, public TableName_KeyValueOpQueues
{
public:
ConsumerTable(DBConnector *db, std::string tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE);
ConsumerTable(DBConnector *db, std::string tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0);

/* Get multiple pop elements */
void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, std::string prefix = EMPTY_PREFIX);
Expand Down
4 changes: 2 additions & 2 deletions common/consumertablebase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

namespace swss {

ConsumerTableBase::ConsumerTableBase(DBConnector *db, std::string tableName, int popBatchSize):
TableConsumable(db->getDbId(), tableName),
ConsumerTableBase::ConsumerTableBase(DBConnector *db, std::string tableName, int popBatchSize, int pri):
TableConsumable(db->getDbId(), tableName, pri),
RedisTransactioner(db),
POP_BATCH_SIZE(popBatchSize)
{
Expand Down
2 changes: 1 addition & 1 deletion common/consumertablebase.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class ConsumerTableBase: public TableConsumable, public RedisTransactioner
public:
const int POP_BATCH_SIZE;

ConsumerTableBase(DBConnector *db, std::string tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE);
ConsumerTableBase(DBConnector *db, std::string tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0);

virtual ~ConsumerTableBase();

Expand Down
3 changes: 1 addition & 2 deletions common/logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,9 @@ Logger::Priority Logger::getMinPrio()

while(true)
{
int fd = 0;
Selectable *selectable = nullptr;

int ret = select.select(&selectable, &fd);
int ret = select.select(&selectable);

if (ret == Select::ERROR)
{
Expand Down
25 changes: 11 additions & 14 deletions common/netlink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
using namespace swss;
using namespace std;

NetLink::NetLink() :
m_socket(NULL)
NetLink::NetLink(int pri) :
Selectable(pri), m_socket(NULL)
{
m_socket = nl_socket_alloc();
if (!m_socket)
Expand Down Expand Up @@ -71,24 +71,21 @@ void NetLink::dumpRequest(int rtmGetCommand)
}
}

void NetLink::addFd(fd_set *fd)
int NetLink::getFd()
{
FD_SET(nl_socket_get_fd(m_socket), fd);
return nl_socket_get_fd(m_socket);
}

bool NetLink::isMe(fd_set *fd)
void NetLink::readData()
{
return FD_ISSET(nl_socket_get_fd(m_socket), fd);
}
int err;

int NetLink::readCache()
{
return NODATA;
}
do
{
err = nl_recvmsgs_default(m_socket);
}
while(err == -NLE_INTR); // Retry if the process was interrupted by a signal

void NetLink::readMe()
{
int err = nl_recvmsgs_default(m_socket);
if (err < 0)
{
if (err == -NLE_NOMEM)
Expand Down
8 changes: 3 additions & 5 deletions common/netlink.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,14 @@ namespace swss {

class NetLink : public Selectable {
public:
NetLink();
NetLink(int pri = 0);
virtual ~NetLink();

void registerGroup(int rtnlGroup);
void dumpRequest(int rtmGetCommand);

virtual void addFd(fd_set *fd);
virtual bool isMe(fd_set *fd);
virtual int readCache();
virtual void readMe();
int getFd() override;
void readData() override;

private:
static int onNetlinkMsg(struct nl_msg *msg, void *arg);
Expand Down
98 changes: 45 additions & 53 deletions common/notificationconsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
#define REDIS_PUBLISH_MESSAGE_INDEX (2)
#define REDIS_PUBLISH_MESSAGE_ELEMNTS (3)

swss::NotificationConsumer::NotificationConsumer(swss::DBConnector *db, std::string channel):
swss::NotificationConsumer::NotificationConsumer(swss::DBConnector *db, std::string channel, int pri):
Selectable(pri),
m_db(db),
m_subscribe(NULL),
m_channel(channel)
Expand Down Expand Up @@ -56,11 +57,51 @@ void swss::NotificationConsumer::subscribe()
SWSS_LOG_INFO("subscribed to %s", m_channel.c_str());
}

void swss::NotificationConsumer::addFd(fd_set *fd)
int swss::NotificationConsumer::getFd()
{
return m_subscribe->getContext()->fd;
}

void swss::NotificationConsumer::readData()
{
SWSS_LOG_ENTER();

FD_SET(m_subscribe->getContext()->fd, fd);
redisReply *reply = nullptr;

if (redisGetReply(m_subscribe->getContext(), reinterpret_cast<void**>(&reply)) != REDIS_OK)
{
SWSS_LOG_ERROR("failed to read redis reply on channel %s", m_channel.c_str());

throw std::runtime_error("Unable to read redis reply");
}
else
{
RedisReply r(reply);
processReply(reply);
}

reply = nullptr;
int status;
do
{
status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast<void**>(&reply));
if(reply != nullptr && status == REDIS_OK)
{
RedisReply r(reply);
processReply(reply);
}
}
while(reply != nullptr && status == REDIS_OK);

if (status != REDIS_OK)
{
throw std::runtime_error("Unable to read redis reply");
}
}

bool swss::NotificationConsumer::hasCachedData()
{
return m_queue.size() > 1;
}

void swss::NotificationConsumer::processReply(redisReply *reply)
Expand Down Expand Up @@ -91,60 +132,11 @@ void swss::NotificationConsumer::processReply(redisReply *reply)
m_queue.push(msg);
}

int swss::NotificationConsumer::readCache()
{
SWSS_LOG_ENTER();

if (m_queue.size() > 0)
{
return Selectable::DATA;
}

redisReply *reply = NULL;

if (redisGetReplyFromReader(m_subscribe->getContext(), (void**)&reply) != REDIS_OK)
{
SWSS_LOG_ERROR("failed to read redis reply on channel %s", m_channel.c_str());

return Selectable::ERROR;
}
else if (reply != NULL)
{
RedisReply r(reply);
processReply(reply);
return Selectable::DATA;
}

return Selectable::NODATA;
}

void swss::NotificationConsumer::readMe()
{
SWSS_LOG_ENTER();

redisReply *reply = NULL;

if (redisGetReply(m_subscribe->getContext(), (void**)&reply) != REDIS_OK)
{
SWSS_LOG_ERROR("failed to read redis reply on channel %s", m_channel.c_str());

throw std::runtime_error("Unable to read redis reply");
}

RedisReply r(reply);
processReply(reply);
}

bool swss::NotificationConsumer::isMe(fd_set *fd)
{
return FD_ISSET(m_subscribe->getContext()->fd, fd);
}

void swss::NotificationConsumer::pop(std::string &op, std::string &data, std::vector<FieldValueTuple> &values)
{
SWSS_LOG_ENTER();

if (m_queue.size() == 0)
if (m_queue.empty())
{
SWSS_LOG_ERROR("notification queue is empty, can't pop");
throw std::runtime_error("notification queue is empty, can't pop");
Expand Down
9 changes: 4 additions & 5 deletions common/notificationconsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ namespace swss {
class NotificationConsumer : public Selectable
{
public:
NotificationConsumer(swss::DBConnector *db, std::string channel);
NotificationConsumer(swss::DBConnector *db, std::string channel, int pri = 100);

void pop(std::string &op, std::string &data, std::vector<FieldValueTuple> &values);

virtual ~NotificationConsumer();

virtual void addFd(fd_set *fd);
virtual bool isMe(fd_set *fd);
virtual int readCache();
virtual void readMe();
int getFd() override;
void readData() override;
bool hasCachedData() override;

private:

Expand Down
1 change: 1 addition & 0 deletions common/rediscommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ void RedisCommand::format(const char *fmt, ...)
va_list ap;
va_start(ap, fmt);
int len = redisvFormatCommand(&temp, fmt, ap);
va_end(ap);
if (len == -1) {
throw std::bad_alloc();
} else if (len == -2) {
Expand Down
58 changes: 32 additions & 26 deletions common/redisselect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,51 +8,57 @@

namespace swss {

RedisSelect::RedisSelect()
RedisSelect::RedisSelect(int pri) : Selectable(pri), m_queueLength(-1)
{
}

void RedisSelect::addFd(fd_set *fd)
int RedisSelect::getFd()
{
FD_SET(m_subscribe->getContext()->fd, fd);
return m_subscribe->getContext()->fd;
}

int RedisSelect::readCache()
void RedisSelect::readData()
{
redisReply *reply = NULL;
redisReply *reply = nullptr;

/* Read the messages in queue before subscribe command execute */
if (m_queueLength) {
m_queueLength--;
return Selectable::DATA;
}
if (redisGetReply(m_subscribe->getContext(), reinterpret_cast<void**>(&reply)) != REDIS_OK)
throw std::runtime_error("Unable to read redis reply");

if (redisGetReplyFromReader(m_subscribe->getContext(),
(void**)&reply) != REDIS_OK)
{
return Selectable::ERROR;
} else if (reply != NULL)
freeReplyObject(reply);
m_queueLength++;

reply = nullptr;
int status;
do
{
freeReplyObject(reply);
return Selectable::DATA;
status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast<void**>(&reply));
if(reply != nullptr && status == REDIS_OK)
{
m_queueLength++;
freeReplyObject(reply);
}
}
while(reply != nullptr && status == REDIS_OK);

return Selectable::NODATA;
if (status != REDIS_OK)
{
throw std::runtime_error("Unable to read redis reply");
}
}

void RedisSelect::readMe()
bool RedisSelect::hasCachedData()
{
redisReply *reply = NULL;

if (redisGetReply(m_subscribe->getContext(), (void**)&reply) != REDIS_OK)
throw "Unable to read redis reply";
return m_queueLength > 1;
}

freeReplyObject(reply);
bool RedisSelect::initializedWithData()
{
return m_queueLength > 0;
}

bool RedisSelect::isMe(fd_set *fd)
void RedisSelect::updateAfterRead()
{
return FD_ISSET(m_subscribe->getContext()->fd, fd);
m_queueLength--;
}

/* Create a new redisContext, SELECT DB and SUBSCRIBE */
Expand Down
14 changes: 6 additions & 8 deletions common/redisselect.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@ class RedisSelect : public Selectable
/* The database is already alive and kicking, no need for more than a second */
static constexpr unsigned int SUBSCRIBE_TIMEOUT = 1000;

RedisSelect();
RedisSelect(int pri = 0);

void addFd(fd_set *fd);

int readCache();

void readMe();

bool isMe(fd_set *fd);
int getFd() override;
void readData() override;
bool hasCachedData() override;
bool initializedWithData() override;
void updateAfterRead() override;

/* Create a new redisContext, SELECT DB and SUBSCRIBE */
void subscribe(DBConnector* db, std::string channelName);
Expand Down
Loading

0 comments on commit 1ef337a

Please sign in to comment.