Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Selectables. Add support for selectable priorities #192

Merged
merged 16 commits into from
Apr 16, 2018
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions common/consumerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

namespace swss {

ConsumerStateTable::ConsumerStateTable(DBConnector *db, std::string tableName, int popBatchSize)
: ConsumerTableBase(db, tableName, popBatchSize)
ConsumerStateTable::ConsumerStateTable(DBConnector *db, std::string tableName, int popBatchSize, int pri)
: ConsumerTableBase(db, tableName, popBatchSize, pri)
, TableName_KeySet(tableName)
{
for (;;)
Expand Down
2 changes: 1 addition & 1 deletion common/consumerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace swss {
class ConsumerStateTable : public ConsumerTableBase, public TableName_KeySet
{
public:
ConsumerStateTable(DBConnector *db, std::string tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE);
ConsumerStateTable(DBConnector *db, std::string tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0);

/* Get multiple pop elements */
void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, std::string prefix = EMPTY_PREFIX);
Expand Down
4 changes: 2 additions & 2 deletions common/consumertable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ using namespace std;

namespace swss {

ConsumerTable::ConsumerTable(DBConnector *db, string tableName, int popBatchSize)
: ConsumerTableBase(db, tableName, popBatchSize)
ConsumerTable::ConsumerTable(DBConnector *db, string tableName, int popBatchSize, int pri)
: ConsumerTableBase(db, tableName, popBatchSize, pri)
, TableName_KeyValueOpQueues(tableName)
{
for (;;)
Expand Down
2 changes: 1 addition & 1 deletion common/consumertable.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace swss {
class ConsumerTable : public ConsumerTableBase, public TableName_KeyValueOpQueues
{
public:
ConsumerTable(DBConnector *db, std::string tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE);
ConsumerTable(DBConnector *db, std::string tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0);

/* Get multiple pop elements */
void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, std::string prefix = EMPTY_PREFIX);
Expand Down
4 changes: 2 additions & 2 deletions common/consumertablebase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

namespace swss {

ConsumerTableBase::ConsumerTableBase(DBConnector *db, std::string tableName, int popBatchSize):
TableConsumable(db->getDbId(), tableName),
ConsumerTableBase::ConsumerTableBase(DBConnector *db, std::string tableName, int popBatchSize, int pri):
TableConsumable(db->getDbId(), tableName, pri),
RedisTransactioner(db),
POP_BATCH_SIZE(popBatchSize)
{
Expand Down
2 changes: 1 addition & 1 deletion common/consumertablebase.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class ConsumerTableBase: public TableConsumable, public RedisTransactioner
public:
const int POP_BATCH_SIZE;

ConsumerTableBase(DBConnector *db, std::string tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE);
ConsumerTableBase(DBConnector *db, std::string tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0);

virtual ~ConsumerTableBase();

Expand Down
3 changes: 1 addition & 2 deletions common/logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,9 @@ Logger::Priority Logger::getMinPrio()

while(true)
{
int fd = 0;
Selectable *selectable = nullptr;

int ret = select.select(&selectable, &fd);
int ret = select.select(&selectable);

if (ret == Select::ERROR)
{
Expand Down
25 changes: 11 additions & 14 deletions common/netlink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
using namespace swss;
using namespace std;

NetLink::NetLink() :
m_socket(NULL)
NetLink::NetLink(int pri) :
Selectable(pri), m_socket(NULL)
{
m_socket = nl_socket_alloc();
if (!m_socket)
Expand Down Expand Up @@ -71,24 +71,21 @@ void NetLink::dumpRequest(int rtmGetCommand)
}
}

void NetLink::addFd(fd_set *fd)
int NetLink::getFd()
{
FD_SET(nl_socket_get_fd(m_socket), fd);
return nl_socket_get_fd(m_socket);
}

bool NetLink::isMe(fd_set *fd)
void NetLink::readData()
{
return FD_ISSET(nl_socket_get_fd(m_socket), fd);
}
int err;

int NetLink::readCache()
{
return NODATA;
}
do
{
err = nl_recvmsgs_default(m_socket);
}
while(err == -NLE_INTR); // Retry if the process was interrupted by a signal

void NetLink::readMe()
{
int err = nl_recvmsgs_default(m_socket);
if (err < 0)
{
if (err == -NLE_NOMEM)
Expand Down
8 changes: 3 additions & 5 deletions common/netlink.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,14 @@ namespace swss {

class NetLink : public Selectable {
public:
NetLink();
NetLink(int pri = 0);
virtual ~NetLink();

void registerGroup(int rtnlGroup);
void dumpRequest(int rtmGetCommand);

virtual void addFd(fd_set *fd);
virtual bool isMe(fd_set *fd);
virtual int readCache();
virtual void readMe();
int getFd() override;
void readData() override;

private:
static int onNetlinkMsg(struct nl_msg *msg, void *arg);
Expand Down
98 changes: 45 additions & 53 deletions common/notificationconsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
#define REDIS_PUBLISH_MESSAGE_INDEX (2)
#define REDIS_PUBLISH_MESSAGE_ELEMNTS (3)

swss::NotificationConsumer::NotificationConsumer(swss::DBConnector *db, std::string channel):
swss::NotificationConsumer::NotificationConsumer(swss::DBConnector *db, std::string channel, int pri):
Selectable(pri),
m_db(db),
m_subscribe(NULL),
m_channel(channel)
Expand Down Expand Up @@ -56,11 +57,51 @@ void swss::NotificationConsumer::subscribe()
SWSS_LOG_INFO("subscribed to %s", m_channel.c_str());
}

void swss::NotificationConsumer::addFd(fd_set *fd)
int swss::NotificationConsumer::getFd()
{
return m_subscribe->getContext()->fd;
}

void swss::NotificationConsumer::readData()
{
SWSS_LOG_ENTER();

FD_SET(m_subscribe->getContext()->fd, fd);
redisReply *reply = nullptr;

if (redisGetReply(m_subscribe->getContext(), reinterpret_cast<void**>(&reply)) != REDIS_OK)
{
SWSS_LOG_ERROR("failed to read redis reply on channel %s", m_channel.c_str());

throw std::runtime_error("Unable to read redis reply");
}
else
{
RedisReply r(reply);
processReply(reply);
}

reply = nullptr;
int status;
do
{
status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast<void**>(&reply));
if(reply != nullptr && status == REDIS_OK)
{
RedisReply r(reply);
processReply(reply);
}
}
while(reply != nullptr && status == REDIS_OK);

if (status != REDIS_OK)
{
throw std::runtime_error("Unable to read redis reply");
}
}

bool swss::NotificationConsumer::hasCachedData()
{
return m_queue.size() > 1;
}

void swss::NotificationConsumer::processReply(redisReply *reply)
Expand Down Expand Up @@ -91,60 +132,11 @@ void swss::NotificationConsumer::processReply(redisReply *reply)
m_queue.push(msg);
}

int swss::NotificationConsumer::readCache()
{
SWSS_LOG_ENTER();

if (m_queue.size() > 0)
{
return Selectable::DATA;
}

redisReply *reply = NULL;

if (redisGetReplyFromReader(m_subscribe->getContext(), (void**)&reply) != REDIS_OK)
{
SWSS_LOG_ERROR("failed to read redis reply on channel %s", m_channel.c_str());

return Selectable::ERROR;
}
else if (reply != NULL)
{
RedisReply r(reply);
processReply(reply);
return Selectable::DATA;
}

return Selectable::NODATA;
}

void swss::NotificationConsumer::readMe()
{
SWSS_LOG_ENTER();

redisReply *reply = NULL;

if (redisGetReply(m_subscribe->getContext(), (void**)&reply) != REDIS_OK)
{
SWSS_LOG_ERROR("failed to read redis reply on channel %s", m_channel.c_str());

throw std::runtime_error("Unable to read redis reply");
}

RedisReply r(reply);
processReply(reply);
}

bool swss::NotificationConsumer::isMe(fd_set *fd)
{
return FD_ISSET(m_subscribe->getContext()->fd, fd);
}

void swss::NotificationConsumer::pop(std::string &op, std::string &data, std::vector<FieldValueTuple> &values)
{
SWSS_LOG_ENTER();

if (m_queue.size() == 0)
if (m_queue.empty())
{
SWSS_LOG_ERROR("notification queue is empty, can't pop");
throw std::runtime_error("notification queue is empty, can't pop");
Expand Down
9 changes: 4 additions & 5 deletions common/notificationconsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ namespace swss {
class NotificationConsumer : public Selectable
{
public:
NotificationConsumer(swss::DBConnector *db, std::string channel);
NotificationConsumer(swss::DBConnector *db, std::string channel, int pri = 100);

void pop(std::string &op, std::string &data, std::vector<FieldValueTuple> &values);

virtual ~NotificationConsumer();

virtual void addFd(fd_set *fd);
virtual bool isMe(fd_set *fd);
virtual int readCache();
virtual void readMe();
int getFd() override;
void readData() override;
bool hasCachedData() override;

private:

Expand Down
1 change: 1 addition & 0 deletions common/rediscommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ void RedisCommand::format(const char *fmt, ...)
va_list ap;
va_start(ap, fmt);
int len = redisvFormatCommand(&temp, fmt, ap);
va_end(ap);
if (len == -1) {
throw std::bad_alloc();
} else if (len == -2) {
Expand Down
58 changes: 32 additions & 26 deletions common/redisselect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,51 +8,57 @@

namespace swss {

RedisSelect::RedisSelect()
RedisSelect::RedisSelect(int pri) : Selectable(pri), m_queueLength(-1)
{
}

void RedisSelect::addFd(fd_set *fd)
int RedisSelect::getFd()
{
FD_SET(m_subscribe->getContext()->fd, fd);
return m_subscribe->getContext()->fd;
}

int RedisSelect::readCache()
void RedisSelect::readData()
{
redisReply *reply = NULL;
redisReply *reply = nullptr;

/* Read the messages in queue before subscribe command execute */
if (m_queueLength) {
m_queueLength--;
return Selectable::DATA;
}
if (redisGetReply(m_subscribe->getContext(), reinterpret_cast<void**>(&reply)) != REDIS_OK)
throw std::runtime_error("Unable to read redis reply");

if (redisGetReplyFromReader(m_subscribe->getContext(),
(void**)&reply) != REDIS_OK)
{
return Selectable::ERROR;
} else if (reply != NULL)
freeReplyObject(reply);
m_queueLength++;

reply = nullptr;
int status;
do
{
freeReplyObject(reply);
return Selectable::DATA;
status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast<void**>(&reply));
if(reply != nullptr && status == REDIS_OK)
{
m_queueLength++;
freeReplyObject(reply);
}
}
while(reply != nullptr && status == REDIS_OK);

return Selectable::NODATA;
if (status != REDIS_OK)
{
throw std::runtime_error("Unable to read redis reply");
}
}

void RedisSelect::readMe()
bool RedisSelect::hasCachedData()
{
redisReply *reply = NULL;

if (redisGetReply(m_subscribe->getContext(), (void**)&reply) != REDIS_OK)
throw "Unable to read redis reply";
return m_queueLength > 1;
}

freeReplyObject(reply);
bool RedisSelect::initializedWithData()
{
return m_queueLength > 0;
}

bool RedisSelect::isMe(fd_set *fd)
void RedisSelect::updateAfterRead()
{
return FD_ISSET(m_subscribe->getContext()->fd, fd);
m_queueLength--;
}

/* Create a new redisContext, SELECT DB and SUBSCRIBE */
Expand Down
14 changes: 6 additions & 8 deletions common/redisselect.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@ class RedisSelect : public Selectable
/* The database is already alive and kicking, no need for more than a second */
static constexpr unsigned int SUBSCRIBE_TIMEOUT = 1000;

RedisSelect();
RedisSelect(int pri = 0);

void addFd(fd_set *fd);

int readCache();

void readMe();

bool isMe(fd_set *fd);
int getFd() override;
void readData() override;
bool hasCachedData() override;
bool initializedWithData() override;
void updateAfterRead() override;

/* Create a new redisContext, SELECT DB and SUBSCRIBE */
void subscribe(DBConnector* db, std::string channelName);
Expand Down
Loading