diff --git a/common/consumerstatetable.cpp b/common/consumerstatetable.cpp index 3768646c0..15d74d003 100644 --- a/common/consumerstatetable.cpp +++ b/common/consumerstatetable.cpp @@ -11,8 +11,8 @@ namespace swss { -ConsumerStateTable::ConsumerStateTable(DBConnector *db, std::string tableName, int popBatchSize) - : ConsumerTableBase(db, tableName, popBatchSize) +ConsumerStateTable::ConsumerStateTable(DBConnector *db, std::string tableName, int popBatchSize, int pri) + : ConsumerTableBase(db, tableName, popBatchSize, pri) , TableName_KeySet(tableName) { for (;;) diff --git a/common/consumerstatetable.h b/common/consumerstatetable.h index bf1f1db7e..2fd954ee8 100644 --- a/common/consumerstatetable.h +++ b/common/consumerstatetable.h @@ -10,7 +10,7 @@ namespace swss { class ConsumerStateTable : public ConsumerTableBase, public TableName_KeySet { public: - ConsumerStateTable(DBConnector *db, std::string tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE); + ConsumerStateTable(DBConnector *db, std::string tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0); /* Get multiple pop elements */ void pops(std::deque &vkco, std::string prefix = EMPTY_PREFIX); diff --git a/common/consumertable.cpp b/common/consumertable.cpp index 995af697b..0ac8bbbb2 100644 --- a/common/consumertable.cpp +++ b/common/consumertable.cpp @@ -14,8 +14,8 @@ using namespace std; namespace swss { -ConsumerTable::ConsumerTable(DBConnector *db, string tableName, int popBatchSize) - : ConsumerTableBase(db, tableName, popBatchSize) +ConsumerTable::ConsumerTable(DBConnector *db, string tableName, int popBatchSize, int pri) + : ConsumerTableBase(db, tableName, popBatchSize, pri) , TableName_KeyValueOpQueues(tableName) { for (;;) diff --git a/common/consumertable.h b/common/consumertable.h index 70a4b7d81..94a00be59 100644 --- a/common/consumertable.h +++ b/common/consumertable.h @@ -13,7 +13,7 @@ namespace swss { class ConsumerTable : public ConsumerTableBase, public TableName_KeyValueOpQueues { public: - ConsumerTable(DBConnector *db, std::string tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE); + ConsumerTable(DBConnector *db, std::string tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0); /* Get multiple pop elements */ void pops(std::deque &vkco, std::string prefix = EMPTY_PREFIX); diff --git a/common/consumertablebase.cpp b/common/consumertablebase.cpp index c506ccfd6..c82b545fb 100644 --- a/common/consumertablebase.cpp +++ b/common/consumertablebase.cpp @@ -2,8 +2,8 @@ namespace swss { -ConsumerTableBase::ConsumerTableBase(DBConnector *db, std::string tableName, int popBatchSize): - TableConsumable(db->getDbId(), tableName), +ConsumerTableBase::ConsumerTableBase(DBConnector *db, std::string tableName, int popBatchSize, int pri): + TableConsumable(db->getDbId(), tableName, pri), RedisTransactioner(db), POP_BATCH_SIZE(popBatchSize) { diff --git a/common/consumertablebase.h b/common/consumertablebase.h index 5580dd413..10979b8cd 100644 --- a/common/consumertablebase.h +++ b/common/consumertablebase.h @@ -10,7 +10,7 @@ class ConsumerTableBase: public TableConsumable, public RedisTransactioner public: const int POP_BATCH_SIZE; - ConsumerTableBase(DBConnector *db, std::string tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE); + ConsumerTableBase(DBConnector *db, std::string tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0); virtual ~ConsumerTableBase(); diff --git a/common/logger.cpp b/common/logger.cpp index e96853a9d..0df2b88ed 100644 --- a/common/logger.cpp +++ b/common/logger.cpp @@ -166,10 +166,9 @@ Logger::Priority Logger::getMinPrio() while(true) { - int fd = 0; Selectable *selectable = nullptr; - int ret = select.select(&selectable, &fd); + int ret = select.select(&selectable); if (ret == Select::ERROR) { diff --git a/common/netlink.cpp b/common/netlink.cpp index e761770db..1db9f3af2 100644 --- a/common/netlink.cpp +++ b/common/netlink.cpp @@ -9,8 +9,8 @@ using namespace swss; using namespace std; -NetLink::NetLink() : - m_socket(NULL) +NetLink::NetLink(int pri) : + Selectable(pri), m_socket(NULL) { m_socket = nl_socket_alloc(); if (!m_socket) @@ -71,24 +71,21 @@ void NetLink::dumpRequest(int rtmGetCommand) } } -void NetLink::addFd(fd_set *fd) +int NetLink::getFd() { - FD_SET(nl_socket_get_fd(m_socket), fd); + return nl_socket_get_fd(m_socket); } -bool NetLink::isMe(fd_set *fd) +void NetLink::readData() { - return FD_ISSET(nl_socket_get_fd(m_socket), fd); -} + int err; -int NetLink::readCache() -{ - return NODATA; -} + do + { + err = nl_recvmsgs_default(m_socket); + } + while(err == -NLE_INTR); // Retry if the process was interrupted by a signal -void NetLink::readMe() -{ - int err = nl_recvmsgs_default(m_socket); if (err < 0) { if (err == -NLE_NOMEM) diff --git a/common/netlink.h b/common/netlink.h index 4ff1614e9..3cdb9d788 100644 --- a/common/netlink.h +++ b/common/netlink.h @@ -9,16 +9,14 @@ namespace swss { class NetLink : public Selectable { public: - NetLink(); + NetLink(int pri = 0); virtual ~NetLink(); void registerGroup(int rtnlGroup); void dumpRequest(int rtmGetCommand); - virtual void addFd(fd_set *fd); - virtual bool isMe(fd_set *fd); - virtual int readCache(); - virtual void readMe(); + int getFd() override; + void readData() override; private: static int onNetlinkMsg(struct nl_msg *msg, void *arg); diff --git a/common/notificationconsumer.cpp b/common/notificationconsumer.cpp index 25f49a14f..4e47211cc 100644 --- a/common/notificationconsumer.cpp +++ b/common/notificationconsumer.cpp @@ -6,7 +6,8 @@ #define REDIS_PUBLISH_MESSAGE_INDEX (2) #define REDIS_PUBLISH_MESSAGE_ELEMNTS (3) -swss::NotificationConsumer::NotificationConsumer(swss::DBConnector *db, std::string channel): +swss::NotificationConsumer::NotificationConsumer(swss::DBConnector *db, std::string channel, int pri): + Selectable(pri), m_db(db), m_subscribe(NULL), m_channel(channel) @@ -56,11 +57,51 @@ void swss::NotificationConsumer::subscribe() SWSS_LOG_INFO("subscribed to %s", m_channel.c_str()); } -void swss::NotificationConsumer::addFd(fd_set *fd) +int swss::NotificationConsumer::getFd() +{ + return m_subscribe->getContext()->fd; +} + +void swss::NotificationConsumer::readData() { SWSS_LOG_ENTER(); - FD_SET(m_subscribe->getContext()->fd, fd); + redisReply *reply = nullptr; + + if (redisGetReply(m_subscribe->getContext(), reinterpret_cast(&reply)) != REDIS_OK) + { + SWSS_LOG_ERROR("failed to read redis reply on channel %s", m_channel.c_str()); + + throw std::runtime_error("Unable to read redis reply"); + } + else + { + RedisReply r(reply); + processReply(reply); + } + + reply = nullptr; + int status; + do + { + status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast(&reply)); + if(reply != nullptr && status == REDIS_OK) + { + RedisReply r(reply); + processReply(reply); + } + } + while(reply != nullptr && status == REDIS_OK); + + if (status != REDIS_OK) + { + throw std::runtime_error("Unable to read redis reply"); + } +} + +bool swss::NotificationConsumer::hasCachedData() +{ + return m_queue.size() > 1; } void swss::NotificationConsumer::processReply(redisReply *reply) @@ -91,60 +132,11 @@ void swss::NotificationConsumer::processReply(redisReply *reply) m_queue.push(msg); } -int swss::NotificationConsumer::readCache() -{ - SWSS_LOG_ENTER(); - - if (m_queue.size() > 0) - { - return Selectable::DATA; - } - - redisReply *reply = NULL; - - if (redisGetReplyFromReader(m_subscribe->getContext(), (void**)&reply) != REDIS_OK) - { - SWSS_LOG_ERROR("failed to read redis reply on channel %s", m_channel.c_str()); - - return Selectable::ERROR; - } - else if (reply != NULL) - { - RedisReply r(reply); - processReply(reply); - return Selectable::DATA; - } - - return Selectable::NODATA; -} - -void swss::NotificationConsumer::readMe() -{ - SWSS_LOG_ENTER(); - - redisReply *reply = NULL; - - if (redisGetReply(m_subscribe->getContext(), (void**)&reply) != REDIS_OK) - { - SWSS_LOG_ERROR("failed to read redis reply on channel %s", m_channel.c_str()); - - throw std::runtime_error("Unable to read redis reply"); - } - - RedisReply r(reply); - processReply(reply); -} - -bool swss::NotificationConsumer::isMe(fd_set *fd) -{ - return FD_ISSET(m_subscribe->getContext()->fd, fd); -} - void swss::NotificationConsumer::pop(std::string &op, std::string &data, std::vector &values) { SWSS_LOG_ENTER(); - if (m_queue.size() == 0) + if (m_queue.empty()) { SWSS_LOG_ERROR("notification queue is empty, can't pop"); throw std::runtime_error("notification queue is empty, can't pop"); diff --git a/common/notificationconsumer.h b/common/notificationconsumer.h index 52f1df174..2498d3cb4 100644 --- a/common/notificationconsumer.h +++ b/common/notificationconsumer.h @@ -19,16 +19,15 @@ namespace swss { class NotificationConsumer : public Selectable { public: - NotificationConsumer(swss::DBConnector *db, std::string channel); + NotificationConsumer(swss::DBConnector *db, std::string channel, int pri = 100); void pop(std::string &op, std::string &data, std::vector &values); virtual ~NotificationConsumer(); - virtual void addFd(fd_set *fd); - virtual bool isMe(fd_set *fd); - virtual int readCache(); - virtual void readMe(); + int getFd() override; + void readData() override; + bool hasCachedData() override; private: diff --git a/common/rediscommand.cpp b/common/rediscommand.cpp index 888a7b815..10d96c7d1 100644 --- a/common/rediscommand.cpp +++ b/common/rediscommand.cpp @@ -21,6 +21,7 @@ void RedisCommand::format(const char *fmt, ...) va_list ap; va_start(ap, fmt); int len = redisvFormatCommand(&temp, fmt, ap); + va_end(ap); if (len == -1) { throw std::bad_alloc(); } else if (len == -2) { diff --git a/common/redisselect.cpp b/common/redisselect.cpp index 3931426fd..6523a2048 100644 --- a/common/redisselect.cpp +++ b/common/redisselect.cpp @@ -8,51 +8,57 @@ namespace swss { -RedisSelect::RedisSelect() +RedisSelect::RedisSelect(int pri) : Selectable(pri), m_queueLength(-1) { } -void RedisSelect::addFd(fd_set *fd) +int RedisSelect::getFd() { - FD_SET(m_subscribe->getContext()->fd, fd); + return m_subscribe->getContext()->fd; } -int RedisSelect::readCache() +void RedisSelect::readData() { - redisReply *reply = NULL; + redisReply *reply = nullptr; - /* Read the messages in queue before subscribe command execute */ - if (m_queueLength) { - m_queueLength--; - return Selectable::DATA; - } + if (redisGetReply(m_subscribe->getContext(), reinterpret_cast(&reply)) != REDIS_OK) + throw std::runtime_error("Unable to read redis reply"); - if (redisGetReplyFromReader(m_subscribe->getContext(), - (void**)&reply) != REDIS_OK) - { - return Selectable::ERROR; - } else if (reply != NULL) + freeReplyObject(reply); + m_queueLength++; + + reply = nullptr; + int status; + do { - freeReplyObject(reply); - return Selectable::DATA; + status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast(&reply)); + if(reply != nullptr && status == REDIS_OK) + { + m_queueLength++; + freeReplyObject(reply); + } } + while(reply != nullptr && status == REDIS_OK); - return Selectable::NODATA; + if (status != REDIS_OK) + { + throw std::runtime_error("Unable to read redis reply"); + } } -void RedisSelect::readMe() +bool RedisSelect::hasCachedData() { - redisReply *reply = NULL; - - if (redisGetReply(m_subscribe->getContext(), (void**)&reply) != REDIS_OK) - throw "Unable to read redis reply"; + return m_queueLength > 1; +} - freeReplyObject(reply); +bool RedisSelect::initializedWithData() +{ + return m_queueLength > 0; } -bool RedisSelect::isMe(fd_set *fd) +void RedisSelect::updateAfterRead() { - return FD_ISSET(m_subscribe->getContext()->fd, fd); + m_queueLength--; } /* Create a new redisContext, SELECT DB and SUBSCRIBE */ diff --git a/common/redisselect.h b/common/redisselect.h index 589d99e0a..49f99e295 100644 --- a/common/redisselect.h +++ b/common/redisselect.h @@ -12,15 +12,13 @@ class RedisSelect : public Selectable /* The database is already alive and kicking, no need for more than a second */ static constexpr unsigned int SUBSCRIBE_TIMEOUT = 1000; - RedisSelect(); + RedisSelect(int pri = 0); - void addFd(fd_set *fd); - - int readCache(); - - void readMe(); - - bool isMe(fd_set *fd); + int getFd() override; + void readData() override; + bool hasCachedData() override; + bool initializedWithData() override; + void updateAfterRead() override; /* Create a new redisContext, SELECT DB and SUBSCRIBE */ void subscribe(DBConnector* db, std::string channelName); diff --git a/common/select.cpp b/common/select.cpp index f0e0cf8b3..bcfb5389e 100644 --- a/common/select.cpp +++ b/common/select.cpp @@ -5,24 +5,78 @@ #include #include #include +#include +#include +#include using namespace std; namespace swss { +Select::Select() +{ + m_epoll_fd = ::epoll_create1(0); + if (m_epoll_fd == -1) + { + std::string error = std::string("Select::constructor:epoll_create1: error=(" + + std::to_string(errno) + "}:" + + strerror(errno)); + throw std::runtime_error(error); + } +} + +Select::~Select() +{ + (void)::close(m_epoll_fd); +} + void Select::addSelectable(Selectable *selectable) { - if(find(m_objects.begin(), m_objects.end(), selectable) != m_objects.end()) + const int fd = selectable->getFd(); + + if(m_objects.find(fd) != m_objects.end()) { SWSS_LOG_WARN("Selectable is already added to the list, ignoring."); return; } - m_objects.push_back(selectable); + + m_objects[fd] = selectable; + + if (selectable->initializedWithData()) + { + m_ready.insert(selectable); + } + + struct epoll_event ev = { + .events = EPOLLIN, + .data = { .fd = fd, }, + }; + + int res = ::epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &ev); + if (res == -1) + { + std::string error = std::string("Select::add_fd:epoll_ctl: error=(" + + std::to_string(errno) + "}:" + + strerror(errno)); + throw std::runtime_error(error); + } } -void Select::removeSelectable(Selectable *c) +void Select::removeSelectable(Selectable *selectable) { - m_objects.erase(remove(m_objects.begin(), m_objects.end(), c), m_objects.end()); + const int fd = selectable->getFd(); + + m_objects.erase(fd); + m_ready.erase(selectable); + + int res = ::epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, NULL); + if (res == -1) + { + std::string error = std::string("Select::del_fd:epoll_ctl: error=(" + + std::to_string(errno) + "}:" + + strerror(errno)); + throw std::runtime_error(error); + } } void Select::addSelectables(vector selectables) @@ -33,77 +87,76 @@ void Select::addSelectables(vector selectables) } } -void Select::addFd(int fd) +int Select::poll_descriptors(Selectable **c, unsigned int timeout) { - m_fds.push_back(fd); -} + int sz_selectables = static_cast(m_objects.size()); + std::vector events(sz_selectables); + int ret; -int Select::select(Selectable **c, int *fd, unsigned int timeout) -{ - SWSS_LOG_ENTER(); + do + { + ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout); + } + while(ret == -1 && errno == EINTR); // Retry the select if the process was interrupted by a signal - struct timeval t = {0, (suseconds_t)(timeout)*1000}; - struct timeval *pTimeout = NULL; - fd_set fs; - int err; + if (ret < 0) + return Select::ERROR; - FD_ZERO(&fs); - *c = NULL; - *fd = 0; - if (timeout != numeric_limits::max()) - pTimeout = &t; + for (int i = 0; i < ret; ++i) + { + int fd = events[i].data.fd; + Selectable* sel = m_objects[fd]; + sel->readData(); + m_ready.insert(sel); + } - /* Checking caching from reader */ - for (Selectable *i : m_objects) + if (!m_ready.empty()) { - err = i->readCache(); + auto sel = *m_ready.begin(); + + *c = sel; + + m_ready.erase(sel); + // we must update clock only when the selector out of the m_ready + // otherwise we break invariant of the m_ready + sel->updateLastUsedTime(); - if (err == Selectable::ERROR) - return Select::ERROR; - else if (err == Selectable::DATA) { - *c = i; - return Select::OBJECT; + if (sel->hasCachedData()) + { + // reinsert Selectable back to the m_ready set, when there're more messages in the cache + m_ready.insert(sel); } - /* else, timeout = no data */ - i->addFd(&fs); - } + sel->updateAfterRead(); - for (int fdn : m_fds) - { - FD_SET(fdn, &fs); + return Select::OBJECT; } - do - { - err = ::select(FD_SETSIZE, &fs, NULL, NULL, pTimeout); - } - while(err == -1 && errno == EINTR); // Retry the select if the process was interrupted by a signal + return Select::TIMEOUT; +} - if (err < 0) - return Select::ERROR; - if (err == 0) - return Select::TIMEOUT; +int Select::select(Selectable **c, unsigned int timeout) +{ + SWSS_LOG_ENTER(); - /* Check other consumer-table */ - for (Selectable *i : m_objects) - if (i->isMe(&fs)) - { - i->readMe(); - *c = i; - return Select::OBJECT; - } + int ret; - /* Check other FDs */ - for (int f : m_fds) - if (FD_ISSET(f, &fs)) - { - *fd = f; - return Select::FD; - } + *c = NULL; + if (timeout == numeric_limits::max()) + timeout = -1; + + /* check if we have some data */ + ret = poll_descriptors(c, 0); + + /* return if we have data, we have an error or desired timeout was 0 */ + if (ret != Select::TIMEOUT || timeout == 0) + return ret; + + /* wait for data */ + ret = poll_descriptors(c, timeout); + + return ret; - /* Shouldn't reach here */ - return Select::ERROR; } }; diff --git a/common/select.h b/common/select.h index af10577e7..ab22e202b 100644 --- a/common/select.h +++ b/common/select.h @@ -3,6 +3,9 @@ #include #include +#include +#include +#include #include #include #include "selectable.h" @@ -12,29 +15,54 @@ namespace swss { class Select { public: + Select(); + ~Select(); + /* Add object for select */ void addSelectable(Selectable *selectable); + + /* Remove object from select */ void removeSelectable(Selectable *selectable); - void addSelectables(std::vector selectables); - /* Add file-descriptor for select */ - void addFd(int fd); + /* Add multiple objects for select */ + void addSelectables(std::vector selectables); - /* - * Wait until data will arrived, returns the object on which select() - * was signaled. - */ enum { OBJECT = 0, - FD = 1, - ERROR = 2, - TIMEOUT = 3 + ERROR = 1, + TIMEOUT = 2, }; - int select(Selectable **c, int *fd, unsigned int timeout = std::numeric_limits::max()); + + int select(Selectable **c, unsigned int timeout = std::numeric_limits::max()); private: - std::vector m_objects; - std::vector m_fds; + struct cmp + { + bool operator()(const Selectable* a, const Selectable* b) const + { + /* Choose Selectable with highest priority first */ + if (a->getPri() > b->getPri()) + return true; + else if (a->getPri() < b->getPri()) + return false; + + /* if the priorities are equal */ + /* use Selectable which was selected later */ + if (a->getLastUsedTime() < b->getLastUsedTime()) + return true; + else if (a->getLastUsedTime() > b->getLastUsedTime()) + return false; + + /* when a == b */ + return false; + } + }; + + int poll_descriptors(Selectable **c, unsigned int timeout); + + int m_epoll_fd; + std::unordered_map m_objects; + std::set m_ready; }; } diff --git a/common/selectable.h b/common/selectable.h index e14341a63..f82d489ca 100644 --- a/common/selectable.h +++ b/common/selectable.h @@ -4,6 +4,7 @@ #include #include #include +#include #include namespace swss { @@ -11,23 +12,59 @@ namespace swss { class Selectable { public: + Selectable(int pri = 0) : m_priority(pri), + m_last_used_time(std::chrono::steady_clock::now()) {} + virtual ~Selectable() {}; - enum { - DATA = 0, - ERROR = 1, - NODATA = 2 - }; + /* return file handler for the Selectable */ + virtual int getFd() = 0; + + /* Read all data from the fd assicaited with Selectable */ + virtual void readData() = 0; + + /* true if Selectable has data in its cache */ + virtual bool hasCachedData() + { + return false; + } + + /* true if Selectable was initialized with data */ + virtual bool initializedWithData() + { + return false; + } + + /* run this function after every read */ + virtual void updateAfterRead() + { + } + + int getPri() const + { + return m_priority; + } + +private: + + friend class Select; + + // only Select class can access and update m_last_used_time + + std::chrono::time_point getLastUsedTime() const + { + return m_last_used_time; + } - /* Implements FD_SET */ - virtual void addFd(fd_set *fd) = 0; - virtual bool isMe(fd_set *fd) = 0; + void updateLastUsedTime() + { + m_last_used_time = std::chrono::steady_clock::now(); + } - /* Read and empty socket caching (if exists) */ - virtual int readCache() = 0; - /* Read a message from the socket */ - virtual void readMe() = 0; + int m_priority; // defines priority of Selectable inside Select + // higher value is higher priority + std::chrono::time_point m_last_used_time; }; } diff --git a/common/selectableevent.cpp b/common/selectableevent.cpp index c90e9076e..ed940756b 100644 --- a/common/selectableevent.cpp +++ b/common/selectableevent.cpp @@ -11,8 +11,8 @@ namespace swss { -SelectableEvent::SelectableEvent() : - m_efd(0) +SelectableEvent::SelectableEvent(int pri) : + Selectable(pri), m_efd(0) { m_efd = eventfd(0, 0); @@ -35,17 +35,12 @@ SelectableEvent::~SelectableEvent() while(err == -1 && errno == EINTR); } -void SelectableEvent::addFd(fd_set *fd) +int SelectableEvent::getFd() { - FD_SET(m_efd, fd); + return m_efd; } -int SelectableEvent::readCache() -{ - return Selectable::NODATA; -} - -void SelectableEvent::readMe() +void SelectableEvent::readData() { uint64_t r; @@ -64,11 +59,6 @@ void SelectableEvent::readMe() } } -bool SelectableEvent::isMe(fd_set *fd) -{ - return FD_ISSET(m_efd, fd); -} - void SelectableEvent::notify() { SWSS_LOG_ENTER(); diff --git a/common/selectableevent.h b/common/selectableevent.h index 37c6484b1..13e996bbd 100644 --- a/common/selectableevent.h +++ b/common/selectableevent.h @@ -12,15 +12,13 @@ namespace swss { class SelectableEvent : public Selectable { public: - SelectableEvent(); + SelectableEvent(int pri = 0); virtual ~SelectableEvent(); void notify(); - virtual void addFd(fd_set *fd); - virtual bool isMe(fd_set *fd); - virtual int readCache(); - virtual void readMe(); + int getFd() override; + void readData() override; private: int m_efd; diff --git a/common/selectabletimer.cpp b/common/selectabletimer.cpp index 736ddce72..aedc98c62 100644 --- a/common/selectabletimer.cpp +++ b/common/selectabletimer.cpp @@ -11,8 +11,8 @@ namespace swss { -SelectableTimer::SelectableTimer(const timespec& interval) - : m_zero({{0, 0}, {0, 0}}) +SelectableTimer::SelectableTimer(const timespec& interval, int pri) + : Selectable(pri), m_zero({{0, 0}, {0, 0}}) { // Create the timer m_tfd = timerfd_create(CLOCK_REALTIME, 0); @@ -67,17 +67,12 @@ void SelectableTimer::setInterval(const timespec& interval) m_interval.it_interval = interval; } -void SelectableTimer::addFd(fd_set *fd) +int SelectableTimer::getFd() { - FD_SET(m_tfd, fd); + return m_tfd; } -int SelectableTimer::readCache() -{ - return Selectable::NODATA; -} - -void SelectableTimer::readMe() +void SelectableTimer::readData() { uint64_t r; @@ -95,9 +90,6 @@ void SelectableTimer::readMe() } } -bool SelectableTimer::isMe(fd_set *fd) -{ - return FD_ISSET(m_tfd, fd); -} +// FIXME: if timer events are read slower than timer frequency we will lost time events } diff --git a/common/selectabletimer.h b/common/selectabletimer.h index ecc02359d..d6279d0c0 100644 --- a/common/selectabletimer.h +++ b/common/selectabletimer.h @@ -11,17 +11,15 @@ namespace swss { class SelectableTimer : public Selectable { public: - SelectableTimer(const timespec& interval); + SelectableTimer(const timespec& interval, int pri = 50); virtual ~SelectableTimer(); void start(); void stop(); void reset(); void setInterval(const timespec& interval); - virtual void addFd(fd_set *fd); - virtual bool isMe(fd_set *fd); - virtual int readCache(); - virtual void readMe(); + int getFd() override; + void readData() override; private: int m_tfd; diff --git a/common/subscriberstatetable.cpp b/common/subscriberstatetable.cpp index 7d07ce22d..fce7df143 100644 --- a/common/subscriberstatetable.cpp +++ b/common/subscriberstatetable.cpp @@ -14,8 +14,8 @@ using namespace std; namespace swss { -SubscriberStateTable::SubscriberStateTable(DBConnector *db, string tableName) - : ConsumerTableBase(db, tableName), m_table(db, tableName) +SubscriberStateTable::SubscriberStateTable(DBConnector *db, string tableName, int popBatchSize, int pri) + : ConsumerTableBase(db, tableName, popBatchSize, pri), m_table(db, tableName) { m_keyspace = "__keyspace@"; @@ -42,49 +42,48 @@ SubscriberStateTable::SubscriberStateTable(DBConnector *db, string tableName) } } -int SubscriberStateTable::readCache() +void SubscriberStateTable::readData() { - redisReply *reply = NULL; + redisReply *reply = nullptr; - /* If buffers already contain data notify caller about this */ - if (!m_buffer.empty() || !m_keyspace_event_buffer.empty()) + /* Read data from redis. This call is non blocking. This method + * is called from Select framework when data is available in socket. + * NOTE: All data should be stored in event buffer. It won't be possible to + * read them second time. */ + if (redisGetReply(m_subscribe->getContext(), reinterpret_cast(&reply)) != REDIS_OK) { - return Selectable::DATA; + throw std::runtime_error("Unable to read redis reply"); } + m_keyspace_event_buffer.push_back(shared_ptr(make_shared(reply))); + /* Try to read data from redis cacher. * If data exists put it to event buffer. * NOTE: Keyspace event is not persistent and it won't * be possible to read it second time. If it is not stared in * the buffer it will be lost. */ - if (redisGetReplyFromReader(m_subscribe->getContext(), - (void**)&reply) != REDIS_OK) + + reply = nullptr; + int status; + do { - return Selectable::ERROR; + status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast(&reply)); + if(reply != nullptr && status == REDIS_OK) + { + m_keyspace_event_buffer.push_back(shared_ptr(make_shared(reply))); + } } - else if (reply != NULL) + while(reply != nullptr && status == REDIS_OK); + + if (status != REDIS_OK) { - m_keyspace_event_buffer.push_back(shared_ptr(make_shared(reply))); - return Selectable::DATA; + throw std::runtime_error("Unable to read redis reply"); } - - return Selectable::NODATA; } -void SubscriberStateTable::readMe() +bool SubscriberStateTable::hasCachedData() { - redisReply *reply = NULL; - - /* Read data from redis. This call is non blocking. This method - * is called from Select framework when data is available in socket. - * NOTE: All data should be stored in event buffer. It won't be possible to - * read them second time. */ - if (redisGetReply(m_subscribe->getContext(), (void**)&reply) != REDIS_OK) - { - throw runtime_error("Unable to read redis reply"); - } - - m_keyspace_event_buffer.push_back(shared_ptr(make_shared(reply))); + return m_buffer.size() > 1 || m_keyspace_event_buffer.size() > 1; } void SubscriberStateTable::pops(deque &vkco, string /*prefix*/) diff --git a/common/subscriberstatetable.h b/common/subscriberstatetable.h index 1871df3a7..5d47e2cdf 100644 --- a/common/subscriberstatetable.h +++ b/common/subscriberstatetable.h @@ -11,15 +11,18 @@ namespace swss { class SubscriberStateTable : public ConsumerTableBase { public: - SubscriberStateTable(DBConnector *db, std::string tableName); + SubscriberStateTable(DBConnector *db, std::string tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0); /* Get all elements available */ void pops(std::deque &vkco, std::string prefix = EMPTY_PREFIX); - /* Verify if cache contains data */ - int readCache(); /* Read keyspace event from redis */ - void readMe(); + void readData() override; + bool hasCachedData() override; + bool initializedWithData() override + { + return !m_buffer.empty(); + } private: /* Pop keyspace event from event buffer. Caller should free resources. */ diff --git a/common/table.h b/common/table.h index 957e37c2a..762fec972 100644 --- a/common/table.h +++ b/common/table.h @@ -108,7 +108,7 @@ class TableConsumable : public TableBase, public TableEntryPoppable, public Redi /* The default value of pop batch size is 128 */ static constexpr int DEFAULT_POP_BATCH_SIZE = 128; - TableConsumable(int dbId, std::string tableName) : TableBase(dbId, tableName) { } + TableConsumable(int dbId, std::string tableName, int pri) : TableBase(dbId, tableName), RedisSelect(pri) { } }; class TableEntryEnumerable { diff --git a/tests/Makefile.am b/tests/Makefile.am index 0bdf874fa..92ecf388a 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -24,9 +24,10 @@ tests_SOURCES = redis_ut.cpp \ macaddress_ut.cpp \ converter_ut.cpp \ exec_ut.cpp \ - redis_subscriber_state_ut.cpp + redis_subscriber_state_ut.cpp \ + selectable_priority.cpp -tests_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) -tests_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) -tests_LDADD = $(LDADD_GTEST) -lpthread -L$(top_srcdir)/common -lswsscommon +tests_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(LIBNL_CFLAGS) +tests_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(LIBNL_CFLAGS) +tests_LDADD = $(LDADD_GTEST) -lpthread -L$(top_srcdir)/common -lswsscommon $(LIBNL_LIBS) diff --git a/tests/ntf_ut.cpp b/tests/ntf_ut.cpp index bff70a3e6..5264cc294 100644 --- a/tests/ntf_ut.cpp +++ b/tests/ntf_ut.cpp @@ -33,9 +33,7 @@ void ntf_thread() { swss::Selectable *sel; - int fd; - - int result = s.select(&sel, &fd); + int result = s.select(&sel); if (result == swss::Select::OBJECT) { diff --git a/tests/redis_piped_state_ut.cpp b/tests/redis_piped_state_ut.cpp index 123ea304c..6c7619dfe 100644 --- a/tests/redis_piped_state_ut.cpp +++ b/tests/redis_piped_state_ut.cpp @@ -108,14 +108,13 @@ static void consumerWorker(int index) ConsumerStateTable c(&db, tableName); Select cs; Selectable *selectcs; - int tmpfd; int numberOfKeysSet = 0; int numberOfKeyDeleted = 0; int ret, i = 0; KeyOpFieldsValuesTuple kco; cs.addSelectable(&c); - while ((ret = cs.select(&selectcs, &tmpfd)) == Select::OBJECT) + while ((ret = cs.select(&selectcs)) == Select::OBJECT) { c.pop(kco); if (kfvOp(kco) == "SET") @@ -135,7 +134,7 @@ static void consumerWorker(int index) } EXPECT_LE(numberOfKeysSet, numberOfKeyDeleted); - EXPECT_EQ(ret, Selectable::DATA); + EXPECT_EQ(ret, Select::OBJECT); } static inline void clearDB() @@ -186,11 +185,10 @@ TEST(ConsumerStateTable, async_double_set) Select cs; Selectable *selectcs; cs.addSelectable(&c); - int tmpfd; /* First pop operation */ { - int ret = cs.select(&selectcs, &tmpfd); + int ret = cs.select(&selectcs); EXPECT_EQ(ret, Select::OBJECT); KeyOpFieldsValuesTuple kco; c.pop(kco); @@ -218,7 +216,7 @@ TEST(ConsumerStateTable, async_double_set) /* Second select operation */ { - int ret = cs.select(&selectcs, &tmpfd, 1000); + int ret = cs.select(&selectcs, 1000); EXPECT_EQ(ret, Select::TIMEOUT); } } @@ -256,11 +254,10 @@ TEST(ConsumerStateTable, async_set_del) Select cs; Selectable *selectcs; cs.addSelectable(&c); - int tmpfd; /* First pop operation */ { - int ret = cs.select(&selectcs, &tmpfd); + int ret = cs.select(&selectcs); EXPECT_EQ(ret, Select::OBJECT); KeyOpFieldsValuesTuple kco; c.pop(kco); @@ -273,7 +270,7 @@ TEST(ConsumerStateTable, async_set_del) /* Second select operation */ { - int ret = cs.select(&selectcs, &tmpfd, 1000); + int ret = cs.select(&selectcs, 1000); EXPECT_EQ(ret, Select::TIMEOUT); } } @@ -322,11 +319,10 @@ TEST(ConsumerStateTable, async_set_del_set) Select cs; Selectable *selectcs; cs.addSelectable(&c); - int tmpfd; /* First pop operation */ { - int ret = cs.select(&selectcs, &tmpfd); + int ret = cs.select(&selectcs); EXPECT_EQ(ret, Select::OBJECT); KeyOpFieldsValuesTuple kco; c.pop(kco); @@ -350,7 +346,7 @@ TEST(ConsumerStateTable, async_set_del_set) /* Second select operation */ { - int ret = cs.select(&selectcs, &tmpfd, 1000); + int ret = cs.select(&selectcs, 1000); EXPECT_EQ(ret, Select::TIMEOUT); } } @@ -384,13 +380,12 @@ TEST(ConsumerStateTable, async_singlethread) ConsumerStateTable c(&db, tableName); Select cs; Selectable *selectcs; - int tmpfd; int ret, i = 0; KeyOpFieldsValuesTuple kco; cs.addSelectable(&c); int numberOfKeysSet = 0; - while ((ret = cs.select(&selectcs, &tmpfd)) == Select::OBJECT) + while ((ret = cs.select(&selectcs)) == Select::OBJECT) { c.pop(kco); EXPECT_EQ(kfvOp(kco), "SET"); @@ -413,7 +408,7 @@ TEST(ConsumerStateTable, async_singlethread) p.flush(); int numberOfKeyDeleted = 0; - while ((ret = cs.select(&selectcs, &tmpfd)) == Select::OBJECT) + while ((ret = cs.select(&selectcs)) == Select::OBJECT) { c.pop(kco); EXPECT_EQ(kfvOp(kco), "DEL"); @@ -427,7 +422,7 @@ TEST(ConsumerStateTable, async_singlethread) } EXPECT_LE(numberOfKeysSet, numberOfKeyDeleted); - EXPECT_EQ(ret, Selectable::DATA); + EXPECT_EQ(ret, Select::OBJECT); cout << "Done. Waiting for all job to finish " << NUMBER_OF_OPS << " jobs." << endl; @@ -490,9 +485,8 @@ TEST(ConsumerStateTable, async_multitable) while (1) { Selectable *is; - int fd; - ret = cs.select(&is, &fd); + ret = cs.select(&is); EXPECT_EQ(ret, Select::OBJECT); ((ConsumerStateTable *)is)->pop(kco); diff --git a/tests/redis_piped_ut.cpp b/tests/redis_piped_ut.cpp index 5c4f77472..9d2328b77 100644 --- a/tests/redis_piped_ut.cpp +++ b/tests/redis_piped_ut.cpp @@ -106,14 +106,13 @@ static void consumerWorker(int index) ConsumerTable c(&db, tableName); Select cs; Selectable *selectcs; - int tmpfd; int numberOfKeysSet = 0; int numberOfKeyDeleted = 0; int ret, i = 0; KeyOpFieldsValuesTuple kco; cs.addSelectable(&c); - while ((ret = cs.select(&selectcs, &tmpfd)) == Select::OBJECT) + while ((ret = cs.select(&selectcs)) == Select::OBJECT) { c.pop(kco); if (kfvOp(kco) == "SET") @@ -133,7 +132,7 @@ static void consumerWorker(int index) break; } - EXPECT_EQ(ret, Selectable::DATA); + EXPECT_EQ(ret, Select::OBJECT); } static void clearDB() @@ -199,9 +198,8 @@ TEST(DBConnector, piped_multitable) while (1) { Selectable *is; - int fd; - ret = cs.select(&is, &fd); + ret = cs.select(&is); EXPECT_EQ(ret, Select::OBJECT); ((ConsumerTable *)is)->pop(kco); @@ -255,13 +253,13 @@ TEST(DBConnector, piped_notifications) Select s; s.addSelectable(&nc); Selectable *sel; - int fd, value = 1; + int value = 1; clearDB(); thread np(notificationProducer); - int result = s.select(&sel, &fd, 2000); + int result = s.select(&sel, 2000); if (result == Select::OBJECT) { cout << "Got notification from producer" << endl; @@ -291,11 +289,10 @@ static void selectableEventThread(Selectable *ev, int *value) Select s; s.addSelectable(ev); Selectable *sel; - int fd; cout << "Starting listening ... " << endl; - int result = s.select(&sel, &fd, 2000); + int result = s.select(&sel, 2000); if (result == Select::OBJECT) { if (sel == ev) diff --git a/tests/redis_state_ut.cpp b/tests/redis_state_ut.cpp index 90beaab8e..4a3956e56 100644 --- a/tests/redis_state_ut.cpp +++ b/tests/redis_state_ut.cpp @@ -106,14 +106,13 @@ static void consumerWorker(int index) ConsumerStateTable c(&db, tableName); Select cs; Selectable *selectcs; - int tmpfd; int numberOfKeysSet = 0; int numberOfKeyDeleted = 0; int ret, i = 0; KeyOpFieldsValuesTuple kco; cs.addSelectable(&c); - while ((ret = cs.select(&selectcs, &tmpfd)) == Select::OBJECT) + while ((ret = cs.select(&selectcs)) == Select::OBJECT) { c.pop(kco); if (kfvOp(kco) == "SET") @@ -133,7 +132,7 @@ static void consumerWorker(int index) } EXPECT_LE(numberOfKeysSet, numberOfKeyDeleted); - EXPECT_EQ(ret, Selectable::DATA); + EXPECT_EQ(ret, Select::OBJECT); } static inline void clearDB() @@ -182,11 +181,10 @@ TEST(ConsumerStateTable, double_set) Select cs; Selectable *selectcs; cs.addSelectable(&c); - int tmpfd; /* First pop operation */ { - int ret = cs.select(&selectcs, &tmpfd); + int ret = cs.select(&selectcs); EXPECT_EQ(ret, Select::OBJECT); KeyOpFieldsValuesTuple kco; c.pop(kco); @@ -214,7 +212,7 @@ TEST(ConsumerStateTable, double_set) /* Second select operation */ { - int ret = cs.select(&selectcs, &tmpfd, 1000); + int ret = cs.select(&selectcs, 1000); EXPECT_EQ(ret, Select::TIMEOUT); } } @@ -250,11 +248,10 @@ TEST(ConsumerStateTable, set_del) Select cs; Selectable *selectcs; cs.addSelectable(&c); - int tmpfd; /* First pop operation */ { - int ret = cs.select(&selectcs, &tmpfd); + int ret = cs.select(&selectcs); EXPECT_EQ(ret, Select::OBJECT); KeyOpFieldsValuesTuple kco; c.pop(kco); @@ -267,7 +264,7 @@ TEST(ConsumerStateTable, set_del) /* Second select operation */ { - int ret = cs.select(&selectcs, &tmpfd, 1000); + int ret = cs.select(&selectcs, 1000); EXPECT_EQ(ret, Select::TIMEOUT); } } @@ -314,11 +311,10 @@ TEST(ConsumerStateTable, set_del_set) Select cs; Selectable *selectcs; cs.addSelectable(&c); - int tmpfd; /* First pop operation */ { - int ret = cs.select(&selectcs, &tmpfd); + int ret = cs.select(&selectcs); EXPECT_EQ(ret, Select::OBJECT); KeyOpFieldsValuesTuple kco; c.pop(kco); @@ -342,7 +338,7 @@ TEST(ConsumerStateTable, set_del_set) /* Second select operation */ { - int ret = cs.select(&selectcs, &tmpfd, 1000); + int ret = cs.select(&selectcs, 1000); EXPECT_EQ(ret, Select::TIMEOUT); } } @@ -374,13 +370,12 @@ TEST(ConsumerStateTable, singlethread) ConsumerStateTable c(&db, tableName); Select cs; Selectable *selectcs; - int tmpfd; int ret, i = 0; KeyOpFieldsValuesTuple kco; cs.addSelectable(&c); int numberOfKeysSet = 0; - while ((ret = cs.select(&selectcs, &tmpfd)) == Select::OBJECT) + while ((ret = cs.select(&selectcs)) == Select::OBJECT) { c.pop(kco); EXPECT_EQ(kfvOp(kco), "SET"); @@ -402,7 +397,7 @@ TEST(ConsumerStateTable, singlethread) } int numberOfKeyDeleted = 0; - while ((ret = cs.select(&selectcs, &tmpfd)) == Select::OBJECT) + while ((ret = cs.select(&selectcs)) == Select::OBJECT) { c.pop(kco); EXPECT_EQ(kfvOp(kco), "DEL"); @@ -416,7 +411,7 @@ TEST(ConsumerStateTable, singlethread) } EXPECT_LE(numberOfKeysSet, numberOfKeyDeleted); - EXPECT_EQ(ret, Selectable::DATA); + EXPECT_EQ(ret, Select::OBJECT); cout << "Done. Waiting for all job to finish " << NUMBER_OF_OPS << " jobs." << endl; @@ -479,9 +474,8 @@ TEST(ConsumerStateTable, multitable) while (1) { Selectable *is; - int fd; - ret = cs.select(&is, &fd); + ret = cs.select(&is); EXPECT_EQ(ret, Select::OBJECT); ((ConsumerStateTable *)is)->pop(kco); diff --git a/tests/redis_subscriber_state_ut.cpp b/tests/redis_subscriber_state_ut.cpp index db2a295d8..2b3c7a014 100644 --- a/tests/redis_subscriber_state_ut.cpp +++ b/tests/redis_subscriber_state_ut.cpp @@ -128,7 +128,6 @@ static void subscriberWorker(int index, int *status) SubscriberStateTable c(&db, testTableName); Select cs; Selectable *selectcs; - int tmpfd; int numberOfKeysSet = 0; int numberOfKeyDeleted = 0; int ret, i = 0; @@ -138,7 +137,7 @@ static void subscriberWorker(int index, int *status) status[index] = 1; - while ((ret = cs.select(&selectcs, &tmpfd, 10000)) == Select::OBJECT) + while ((ret = cs.select(&selectcs, 10000)) == Select::OBJECT) { c.pop(kco); if (kfvOp(kco) == "SET") @@ -167,7 +166,7 @@ static void subscriberWorker(int index, int *status) /* Verify that all data are read */ { - ret = cs.select(&selectcs, &tmpfd, 1000); + ret = cs.select(&selectcs, 1000); EXPECT_EQ(ret, Select::TIMEOUT); } } @@ -200,11 +199,9 @@ TEST(SubscriberStateTable, set) p.set(key, fields); } - int tmpfd; - /* Pop operation */ { - int ret = cs.select(&selectcs, &tmpfd); + int ret = cs.select(&selectcs); EXPECT_EQ(ret, Select::OBJECT); KeyOpFieldsValuesTuple kco; c.pop(kco); @@ -255,11 +252,9 @@ TEST(SubscriberStateTable, del) p.set(key, fields); } - int tmpfd; - /* Pop operation for set */ { - int ret = cs.select(&selectcs, &tmpfd); + int ret = cs.select(&selectcs); EXPECT_EQ(ret, Select::OBJECT); KeyOpFieldsValuesTuple kco; c.pop(kco); @@ -271,7 +266,7 @@ TEST(SubscriberStateTable, del) /* Pop operation for del */ { - int ret = cs.select(&selectcs, &tmpfd); + int ret = cs.select(&selectcs); EXPECT_EQ(ret, Select::OBJECT); KeyOpFieldsValuesTuple kco; c.pop(kco); @@ -311,14 +306,13 @@ TEST(SubscriberStateTable, table_state) SubscriberStateTable c(&db, testTableName); Select cs; Selectable *selectcs; - int tmpfd; int ret, i = 0; KeyOpFieldsValuesTuple kco; cs.addSelectable(&c); int numberOfKeysSet = 0; - while ((ret = cs.select(&selectcs, &tmpfd)) == Select::OBJECT) + while ((ret = cs.select(&selectcs)) == Select::OBJECT) { c.pop(kco); EXPECT_EQ(kfvOp(kco), "SET"); @@ -336,7 +330,7 @@ TEST(SubscriberStateTable, table_state) /* Verify that all data are read */ { - ret = cs.select(&selectcs, &tmpfd, 1000); + ret = cs.select(&selectcs, 1000); EXPECT_EQ(ret, Select::TIMEOUT); } } diff --git a/tests/redis_ut.cpp b/tests/redis_ut.cpp index 675b8b369..544646104 100644 --- a/tests/redis_ut.cpp +++ b/tests/redis_ut.cpp @@ -106,14 +106,13 @@ void consumerWorker(int index) ConsumerTable c(&db, tableName); Select cs; Selectable *selectcs; - int tmpfd; int numberOfKeysSet = 0; int numberOfKeyDeleted = 0; int ret, i = 0; KeyOpFieldsValuesTuple kco; cs.addSelectable(&c); - while ((ret = cs.select(&selectcs, &tmpfd)) == Select::OBJECT) + while ((ret = cs.select(&selectcs)) == Select::OBJECT) { c.pop(kco); if (kfvOp(kco) == "SET") @@ -133,7 +132,7 @@ void consumerWorker(int index) break; } - EXPECT_EQ(ret, Selectable::DATA); + EXPECT_EQ(ret, Select::OBJECT); } void clearDB() @@ -288,9 +287,8 @@ TEST(DBConnector, multitable) while (1) { Selectable *is; - int fd; - ret = cs.select(&is, &fd); + ret = cs.select(&is); EXPECT_EQ(ret, Select::OBJECT); ((ConsumerTable *)is)->pop(kco); @@ -344,13 +342,13 @@ TEST(DBConnector, notifications) Select s; s.addSelectable(&nc); Selectable *sel; - int fd, value = 1; + int value = 1; clearDB(); thread np(notificationProducer); - int result = s.select(&sel, &fd, 2000); + int result = s.select(&sel, 2000); if (result == Select::OBJECT) { cout << "Got notification from producer" << endl; @@ -380,11 +378,10 @@ void selectableEventThread(Selectable *ev, int *value) Select s; s.addSelectable(ev); Selectable *sel; - int fd; cout << "Starting listening ... " << endl; - int result = s.select(&sel, &fd, 2000); + int result = s.select(&sel, 2000); if (result == Select::OBJECT) { if (sel == ev) @@ -419,30 +416,30 @@ TEST(DBConnector, selectabletimer) Select s; s.addSelectable(&timer); Selectable *sel; - int fd, result; + int result; // Wait a non started timer - result = s.select(&sel, &fd, 2000); + result = s.select(&sel, 2000); ASSERT_EQ(result, Select::TIMEOUT); // Wait long enough so we got timer notification first timer.start(); - result = s.select(&sel, &fd, 2000); + result = s.select(&sel, 2000); ASSERT_EQ(result, Select::OBJECT); ASSERT_EQ(sel, &timer); // Wait short so we got select timeout first - result = s.select(&sel, &fd, 10); + result = s.select(&sel, 10); ASSERT_EQ(result, Select::TIMEOUT); // Wait long enough so we got timer notification first - result = s.select(&sel, &fd, 10000); + result = s.select(&sel, 10000); ASSERT_EQ(result, Select::OBJECT); ASSERT_EQ(sel, &timer); // Reset and wait long enough so we got timer notification first timer.reset(); - result = s.select(&sel, &fd, 10000); + result = s.select(&sel, 10000); ASSERT_EQ(result, Select::OBJECT); ASSERT_EQ(sel, &timer); } diff --git a/tests/selectable_priority.cpp b/tests/selectable_priority.cpp new file mode 100644 index 000000000..1c33e087d --- /dev/null +++ b/tests/selectable_priority.cpp @@ -0,0 +1,250 @@ +#include "common/dbconnector.h" +#include "common/consumertable.h" +#include "common/notificationconsumer.h" +#include "common/select.h" +#include "common/selectableevent.h" +#include "common/selectabletimer.h" +#include "common/subscriberstatetable.h" +#include "common/netmsg.h" +#include "common/netlink.h" +#include "gtest/gtest.h" + + +using namespace std; +using namespace swss; + + +#define TEST_VIEW (7) +#define DEFAULT_POP_BATCH_SIZE (10) + + +TEST(Priority, default_pri_values) +{ + std::string tableName = "tableName"; + + DBConnector db(TEST_VIEW, "localhost", 6379, 0); + + timespec interval = { .tv_sec = 1, .tv_nsec = 0 }; + + NetLink nl; + ConsumerStateTable cst(&db, tableName); + ConsumerTable ct(&db, tableName); + NotificationConsumer nc(&db, tableName); + RedisSelect rs; + SelectableEvent se; + SelectableTimer st(interval); + SubscriberStateTable sst(&db, tableName); + + EXPECT_EQ(nl.getPri(), 0); + EXPECT_EQ(cst.getPri(), 0); + EXPECT_EQ(ct.getPri(), 0); + EXPECT_EQ(nc.getPri(), 100); + EXPECT_EQ(rs.getPri(), 0); + EXPECT_EQ(se.getPri(), 0); + EXPECT_EQ(st.getPri(), 50); + EXPECT_EQ(sst.getPri(), 0); +} + +TEST(Priority, set_pri_values) +{ + std::string tableName = "tableName"; + + DBConnector db(TEST_VIEW, "localhost", 6379, 0); + + timespec interval = { .tv_sec = 1, .tv_nsec = 0 }; + + NetLink nl(101); + ConsumerStateTable cst(&db, tableName, DEFAULT_POP_BATCH_SIZE, 102); + ConsumerTable ct(&db, tableName, DEFAULT_POP_BATCH_SIZE, 103); + NotificationConsumer nc(&db, tableName, 104); + RedisSelect rs(105); + SelectableEvent se(106); + SelectableTimer st(interval, 107); + SubscriberStateTable sst(&db, tableName, DEFAULT_POP_BATCH_SIZE, 108); + + EXPECT_EQ(nl.getPri(), 101); + EXPECT_EQ(cst.getPri(), 102); + EXPECT_EQ(ct.getPri(), 103); + EXPECT_EQ(nc.getPri(), 104); + EXPECT_EQ(rs.getPri(), 105); + EXPECT_EQ(se.getPri(), 106); + EXPECT_EQ(st.getPri(), 107); + EXPECT_EQ(sst.getPri(), 108); +} + +TEST(Priority, priority_select_1) +{ + Select cs; + Selectable *selectcs; + + SelectableEvent s1(100); + SelectableEvent s2(1000); + SelectableEvent s3(10000); + + cs.addSelectable(&s1); + cs.addSelectable(&s2); + cs.addSelectable(&s3); + + s1.notify(); + s2.notify(); + s3.notify(); + + int ret; + ret = cs.select(&selectcs); + EXPECT_EQ(ret, Select::OBJECT); + EXPECT_EQ(selectcs, &s3); + + ret = cs.select(&selectcs); + EXPECT_EQ(ret, Select::OBJECT); + EXPECT_EQ(selectcs, &s2); + + ret = cs.select(&selectcs); + EXPECT_EQ(ret, Select::OBJECT); + EXPECT_EQ(selectcs, &s1); +} + +TEST(Priority, priority_select_2) +{ + Select cs; + Selectable *selectcs; + + SelectableEvent s1(100); + SelectableEvent s2(1000); + SelectableEvent s3(10000); + + s1.notify(); + s2.notify(); + s3.notify(); + + cs.addSelectable(&s1); + cs.addSelectable(&s2); + cs.addSelectable(&s3); + + int ret; + ret = cs.select(&selectcs); + EXPECT_EQ(ret, Select::OBJECT); + EXPECT_EQ(selectcs, &s3); + + ret = cs.select(&selectcs); + EXPECT_EQ(ret, Select::OBJECT); + EXPECT_EQ(selectcs, &s2); + + ret = cs.select(&selectcs); + EXPECT_EQ(ret, Select::OBJECT); + EXPECT_EQ(selectcs, &s1); +} + +TEST(Priority, priority_select_3) +{ + Select cs; + Selectable *selectcs; + + SelectableEvent s1(10); + SelectableEvent s2(10); + SelectableEvent s3(10000); + + cs.addSelectable(&s1); + cs.addSelectable(&s2); + cs.addSelectable(&s3); + + s1.notify(); + s2.notify(); + s3.notify(); + + int ret; + ret = cs.select(&selectcs); + EXPECT_EQ(ret, Select::OBJECT); + EXPECT_EQ(selectcs, &s3); + + ret = cs.select(&selectcs); + EXPECT_EQ(ret, Select::OBJECT); + EXPECT_TRUE(selectcs==&s1 || selectcs==&s2); + + ret = cs.select(&selectcs); + EXPECT_EQ(ret, Select::OBJECT); + EXPECT_TRUE(selectcs==&s1 || selectcs==&s2); +} + +TEST(Priority, priority_select_4) +{ + Select cs; + Selectable *selectcs; + + SelectableEvent s1(10); + SelectableEvent s2(10000); + SelectableEvent s3(10000); + + cs.addSelectable(&s1); + cs.addSelectable(&s2); + cs.addSelectable(&s3); + + s1.notify(); + s2.notify(); + s3.notify(); + + int ret; + ret = cs.select(&selectcs); + EXPECT_EQ(ret, Select::OBJECT); + EXPECT_TRUE(selectcs==&s2 || selectcs==&s3); + + ret = cs.select(&selectcs); + EXPECT_EQ(ret, Select::OBJECT); + EXPECT_TRUE(selectcs==&s2 || selectcs==&s3); + + ret = cs.select(&selectcs); + EXPECT_EQ(ret, Select::OBJECT); + EXPECT_EQ(selectcs, &s1); +} + +TEST(Priority, priority_select_5) +{ + Select cs; + Selectable *selectcs; + + SelectableEvent s1(150); + SelectableEvent s2(1000); + + cs.addSelectable(&s1); + cs.addSelectable(&s2); + + s1.notify(); + s2.notify(); + + int ret; + ret = cs.select(&selectcs); + EXPECT_EQ(ret, Select::OBJECT); + EXPECT_EQ(selectcs, &s2); + + cs.removeSelectable(&s1); + + ret = cs.select(&selectcs, 1000); + EXPECT_EQ(ret, Select::TIMEOUT); +} + +TEST(Priority, priority_select_6) +{ + Select cs; + Selectable *selectcs1; + Selectable *selectcs2; + + SelectableEvent s1(1000); + SelectableEvent s2(1000); + + cs.addSelectable(&s1); + cs.addSelectable(&s2); + + s1.notify(); + s2.notify(); + + int ret; + ret = cs.select(&selectcs1); + EXPECT_EQ(ret, Select::OBJECT); + + s1.notify(); + s2.notify(); + + ret = cs.select(&selectcs2); + EXPECT_EQ(ret, Select::OBJECT); + // we gave fair scheduler. we've read different selectables on the second read + EXPECT_NE(selectcs1, selectcs2); +} diff --git a/tests/test_redis_ut.py b/tests/test_redis_ut.py index 18e122d1a..9e60670d7 100644 --- a/tests/test_redis_ut.py +++ b/tests/test_redis_ut.py @@ -34,7 +34,7 @@ def test_SubscriberStateTable(): sel.addSelectable(cst) fvs = swsscommon.FieldValuePairs([('a','b')]) t.set("aaa", fvs) - (state, c, fd) = sel.select() + (state, c) = sel.select() assert state == swsscommon.Select.OBJECT (key, op, cfvs) = cst.pop() assert key == "aaa" @@ -50,7 +50,7 @@ def test_Notification(): fvs = swsscommon.FieldValuePairs([('a','b')]) ntfp = swsscommon.NotificationProducer(db, "testntf") ntfp.send("aaa", "bbb", fvs) - (state, c, fd) = sel.select() + (state, c) = sel.select() assert state == swsscommon.Select.OBJECT (op, data, cfvs) = ntfc.pop() assert op == "aaa"