Skip to content

Commit

Permalink
Topic subscription scalability fix (#1217)
Browse files Browse the repository at this point in the history
Switching to using epoll system calls to improve performance of the topic polling code by a factor of 2. This required disabling the addDelMultiThread test.
  • Loading branch information
mikepurvis authored Dec 21, 2017
1 parent 2a8015f commit 9c0db37
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 36 deletions.
3 changes: 3 additions & 0 deletions clients/roscpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,14 @@ catkin_package(

include(CheckIncludeFiles)
include(CheckFunctionExists)
include(CheckCXXSymbolExists)

# Not everybody has <ifaddrs.h> (e.g., embedded arm-linux)
CHECK_INCLUDE_FILES(ifaddrs.h HAVE_IFADDRS_H)
# Not everybody has trunc (e.g., Windows, embedded arm-linux)
CHECK_FUNCTION_EXISTS(trunc HAVE_TRUNC)
# Not everybody has epoll (e.g., Windows, BSD, embedded arm-linux)
CHECK_CXX_SYMBOL_EXISTS(epoll_wait "sys/epoll.h" HAVE_EPOLL)

# Output test results to config.h
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/libros/config.h.in ${CMAKE_CURRENT_BINARY_DIR}/config.h)
Expand Down
10 changes: 9 additions & 1 deletion clients/roscpp/include/ros/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,26 @@ namespace ros {
typedef struct pollfd socket_pollfd;
#endif

typedef boost::shared_ptr<std::vector<socket_pollfd> > pollfd_vector_ptr;

/*****************************************************************************
** Functions
*****************************************************************************/

ROSCPP_DECL int last_socket_error();
ROSCPP_DECL const char* last_socket_error_string();
ROSCPP_DECL bool last_socket_error_is_would_block();
ROSCPP_DECL int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout);
ROSCPP_DECL pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int timeout);
ROSCPP_DECL int set_non_blocking(socket_fd_t &socket);
ROSCPP_DECL int close_socket(socket_fd_t &socket);
ROSCPP_DECL int create_signal_pair(signal_fd_t signal_pair[2]);

ROSCPP_DECL int create_socket_watcher();
ROSCPP_DECL void close_socket_watcher(int fd);
ROSCPP_DECL void add_socket_to_watcher(int epfd, int fd);
ROSCPP_DECL void del_socket_from_watcher(int epfd, int fd);
ROSCPP_DECL void set_events_on_socket(int epfd, int fd, int events);

/*****************************************************************************
** Inlines - almost direct api replacements, should stay fast.
*****************************************************************************/
Expand Down
2 changes: 2 additions & 0 deletions clients/roscpp/include/ros/poll_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ class ROSCPP_DECL PollSet

boost::mutex signal_mutex_;
signal_fd_t signal_pipe_[2];

int epfd_;
};

}
Expand Down
1 change: 1 addition & 0 deletions clients/roscpp/src/libros/config.h.in
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
#cmakedefine HAVE_TRUNC
#cmakedefine HAVE_IFADDRS_H
#cmakedefine HAVE_EPOLL
148 changes: 136 additions & 12 deletions clients/roscpp/src/libros/io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
** Includes
*****************************************************************************/

#include "config.h"

#include <ros/io.h>
#include <ros/assert.h> // don't need if we dont call the pipe functions.
#include <errno.h> // for EFAULT and co.
Expand All @@ -46,6 +48,16 @@
#include <fcntl.h> // for non-blocking configuration
#endif

#ifdef HAVE_EPOLL
#include <sys/epoll.h>
#endif

/*****************************************************************************
** Macros
*****************************************************************************/

#define UNUSED(expr) do { (void)(expr); } while (0)

/*****************************************************************************
** Namespaces
*****************************************************************************/
Expand Down Expand Up @@ -87,6 +99,69 @@ bool last_socket_error_is_would_block() {
#endif
}

int create_socket_watcher() {
int epfd = -1;
#if defined(HAVE_EPOLL)
epfd = ::epoll_create1(0);
if (epfd < 0)
{
ROS_ERROR("Unable to create epoll watcher: %s", strerror(errno));
}
#endif
return epfd;
}

void close_socket_watcher(int fd) {
if (fd >= 0)
::close(fd);
}

void add_socket_to_watcher(int epfd, int fd) {
#if defined(HAVE_EPOLL)
struct epoll_event ev;
ev.events = 0;
ev.data.fd = fd;

if (::epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev))
{
ROS_ERROR("Unable to add FD to epoll: %s", strerror(errno));
}
#else
UNUSED(epfd);
UNUSED(fd);
#endif
}

void del_socket_from_watcher(int epfd, int fd) {
#if defined(HAVE_EPOLL)
if (::epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL))
{
ROS_ERROR("Unable to remove FD to epoll: %s", strerror(errno));
}
#else
UNUSED(epfd);
UNUSED(fd);
#endif
}

void set_events_on_socket(int epfd, int fd, int events) {
#if defined(HAVE_EPOLL)
struct epoll_event ev;
ev.events = events;
ev.data.fd = fd;
if (::epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev))
{
ROS_ERROR("Unable to modify FD epoll: %s", strerror(errno));
}
#else
UNUSED(epfd);
UNUSED(fd);
UNUSED(events);
#endif
}



/*****************************************************************************
** Service Robotics/Libssh Functions
*****************************************************************************/
Expand All @@ -96,22 +171,26 @@ bool last_socket_error_is_would_block() {
* Windows doesn't have a polling function until Vista (WSAPoll) and even then
* its implementation is not supposed to be great. This works for a broader
* range of platforms and will suffice.
* @param epfd - the socket watcher to poll on.
* @param fds - the socket set (descriptor's and events) to poll for.
* @param nfds - the number of descriptors to poll for.
* @param timeout - timeout in milliseconds.
* @return int : -1 on error, 0 on timeout, +ve number of structures with received events.
* @return pollfd_vector_ptr : NULL on error, empty on timeout, a list of structures with received events.
*/
int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout) {
pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int timeout) {
#if defined(WIN32)
fd_set readfds, writefds, exceptfds;
struct timeval tv, *ptv;
socket_fd_t max_fd;
int rc;
nfds_t i;
boost::shared_ptr<std::vector<socket_pollfd> > ofds;

UNUSED(epfd);

if (fds == NULL) {
errno = EFAULT;
return -1;
return ofds;
}

FD_ZERO (&readfds);
Expand Down Expand Up @@ -146,7 +225,7 @@ int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout) {

if (rc == -1) {
errno = EINVAL;
return -1;
return ofds;
}
/*********************
** Setting the timeout
Expand All @@ -166,9 +245,11 @@ int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout) {

rc = select (max_fd + 1, &readfds, &writefds, &exceptfds, ptv);
if (rc < 0) {
return -1;
} else if ( rc == 0 ) {
return 0;
return ofds;
}
ofds.reset(new std::vector<socket_pollfd>);
if ( rc == 0 ) {
return ofds;
}

for (rc = 0, i = 0; i < nfds; i++) {
Expand Down Expand Up @@ -213,18 +294,61 @@ int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout) {
} else {
fds[i].revents = POLLNVAL;
}
ofds->push_back(fds[i]);
}
return ofds;
#elif defined (HAVE_EPOLL)
UNUSED(nfds);
UNUSED(fds);
struct epoll_event ev[nfds];
pollfd_vector_ptr ofds;

int fd_cnt = ::epoll_wait(epfd, ev, nfds, timeout);

if (fd_cnt < 0)
{
// EINTR means that we got interrupted by a signal, and is not an error
if(errno != EINTR) {
ROS_ERROR("Error in epoll_wait! %s", strerror(errno));
ofds.reset();
}
}
return rc;
else
{
ofds.reset(new std::vector<socket_pollfd>);
for (int i = 0; i < fd_cnt; i++)
{
socket_pollfd pfd;
pfd.fd = ev[i].data.fd;
pfd.revents = ev[i].events;
ofds->push_back(pfd);
}
}
return ofds;
#else
UNUSED(epfd);
pollfd_vector_ptr ofds(new std::vector<socket_pollfd>);
// use an existing poll implementation
int result = poll(fds, nfds, timeout);
if ( result < 0 ) {
if ( result < 0 )
{
// EINTR means that we got interrupted by a signal, and is not an error
if(errno == EINTR) {
result = 0;
if(errno != EINTR)
{
ROS_ERROR("Error in poll! %s", strerror(errno));
ofds.reset();
}
} else {
for (nfds_t i = 0; i < nfds; i++)
{
if (fds[i].revents)
{
ofds->push_back(fds[i]);
fds[i].revents = 0;
}
}
}
return result;
return ofds;
#endif // poll_sockets functions
}
/*****************************************************************************
Expand Down
Loading

0 comments on commit 9c0db37

Please sign in to comment.