diff --git a/clients/roscpp/CMakeLists.txt b/clients/roscpp/CMakeLists.txt index ad6ccda1c2..af68f8e13a 100644 --- a/clients/roscpp/CMakeLists.txt +++ b/clients/roscpp/CMakeLists.txt @@ -60,11 +60,14 @@ catkin_package( include(CheckIncludeFiles) include(CheckFunctionExists) +include(CheckCXXSymbolExists) # Not everybody has (e.g., embedded arm-linux) CHECK_INCLUDE_FILES(ifaddrs.h HAVE_IFADDRS_H) # Not everybody has trunc (e.g., Windows, embedded arm-linux) CHECK_FUNCTION_EXISTS(trunc HAVE_TRUNC) +# Not everybody has epoll (e.g., Windows, BSD, embedded arm-linux) +CHECK_CXX_SYMBOL_EXISTS(epoll_wait "sys/epoll.h" HAVE_EPOLL) # Output test results to config.h configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/libros/config.h.in ${CMAKE_CURRENT_BINARY_DIR}/config.h) diff --git a/clients/roscpp/include/ros/callback_queue.h b/clients/roscpp/include/ros/callback_queue.h index 7afb682692..4fe811d745 100644 --- a/clients/roscpp/include/ros/callback_queue.h +++ b/clients/roscpp/include/ros/callback_queue.h @@ -35,6 +35,21 @@ #ifndef ROSCPP_CALLBACK_QUEUE_H #define ROSCPP_CALLBACK_QUEUE_H +// check if we might need to include our own backported version boost::condition_variable +// in order to use CLOCK_MONOTONIC for the condition variable +// the include order here is important! +#ifdef BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC +#include +#if BOOST_VERSION < 106100 +// use backported version of boost condition variable, see https://svn.boost.org/trac/boost/ticket/6377 +#include "boost_161_condition_variable.h" +#else // Boost version is 1.61 or greater and has the steady clock fixes +#include +#endif +#else // !BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC +#include +#endif // BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC + #include "ros/callback_queue_interface.h" #include "ros/time.h" #include "common.h" @@ -42,7 +57,6 @@ #include #include #include -#include #include #include diff --git a/clients/roscpp/include/ros/io.h b/clients/roscpp/include/ros/io.h index fc38cbca2e..4414212127 100644 --- a/clients/roscpp/include/ros/io.h +++ b/clients/roscpp/include/ros/io.h @@ -138,6 +138,8 @@ namespace ros { typedef struct pollfd socket_pollfd; #endif +typedef boost::shared_ptr > pollfd_vector_ptr; + /***************************************************************************** ** Functions *****************************************************************************/ @@ -145,11 +147,17 @@ namespace ros { ROSCPP_DECL int last_socket_error(); ROSCPP_DECL const char* last_socket_error_string(); ROSCPP_DECL bool last_socket_error_is_would_block(); -ROSCPP_DECL int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout); +ROSCPP_DECL pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int timeout); ROSCPP_DECL int set_non_blocking(socket_fd_t &socket); ROSCPP_DECL int close_socket(socket_fd_t &socket); ROSCPP_DECL int create_signal_pair(signal_fd_t signal_pair[2]); +ROSCPP_DECL int create_socket_watcher(); +ROSCPP_DECL void close_socket_watcher(int fd); +ROSCPP_DECL void add_socket_to_watcher(int epfd, int fd); +ROSCPP_DECL void del_socket_from_watcher(int epfd, int fd); +ROSCPP_DECL void set_events_on_socket(int epfd, int fd, int events); + /***************************************************************************** ** Inlines - almost direct api replacements, should stay fast. *****************************************************************************/ diff --git a/clients/roscpp/include/ros/poll_set.h b/clients/roscpp/include/ros/poll_set.h index 4f1091d59f..5a6ede20d1 100644 --- a/clients/roscpp/include/ros/poll_set.h +++ b/clients/roscpp/include/ros/poll_set.h @@ -147,6 +147,8 @@ class ROSCPP_DECL PollSet boost::mutex signal_mutex_; signal_fd_t signal_pipe_[2]; + + int epfd_; }; } diff --git a/clients/roscpp/src/libros/callback_queue.cpp b/clients/roscpp/src/libros/callback_queue.cpp index 5827646ae0..b2bab574b3 100644 --- a/clients/roscpp/src/libros/callback_queue.cpp +++ b/clients/roscpp/src/libros/callback_queue.cpp @@ -32,10 +32,23 @@ * POSSIBILITY OF SUCH DAMAGE. */ +// Make sure we use CLOCK_MONOTONIC for the condition variable wait_for if not Apple. +#ifndef __APPLE__ +#define BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC +#endif + #include "ros/callback_queue.h" #include "ros/assert.h" #include +// check if we have really included the backported boost condition variable +// just in case someone messes with the include order... +#if BOOST_VERSION < 106100 +#ifndef USING_BACKPORTED_BOOST_CONDITION_VARIABLE +#error "needs boost version >= 1.61 or the backported headers!" +#endif +#endif + namespace ros { @@ -229,7 +242,7 @@ CallbackQueue::CallOneResult CallbackQueue::callOne(ros::WallDuration timeout) { if (!timeout.isZero()) { - condition_.timed_wait(lock, boost::posix_time::microseconds(timeout.toSec() * 1000000.0f)); + condition_.wait_for(lock, boost::chrono::nanoseconds(timeout.toNSec())); } if (callbacks_.empty()) @@ -305,7 +318,7 @@ void CallbackQueue::callAvailable(ros::WallDuration timeout) { if (!timeout.isZero()) { - condition_.timed_wait(lock, boost::posix_time::microseconds(timeout.toSec() * 1000000.0f)); + condition_.wait_for(lock, boost::chrono::nanoseconds(timeout.toNSec())); } if (callbacks_.empty() || !enabled_) diff --git a/clients/roscpp/src/libros/config.h.in b/clients/roscpp/src/libros/config.h.in index 7788518c3c..2ec486310c 100644 --- a/clients/roscpp/src/libros/config.h.in +++ b/clients/roscpp/src/libros/config.h.in @@ -1,2 +1,3 @@ #cmakedefine HAVE_TRUNC #cmakedefine HAVE_IFADDRS_H +#cmakedefine HAVE_EPOLL diff --git a/clients/roscpp/src/libros/init.cpp b/clients/roscpp/src/libros/init.cpp index 91e0a58348..7a38aaab1a 100644 --- a/clients/roscpp/src/libros/init.cpp +++ b/clients/roscpp/src/libros/init.cpp @@ -316,17 +316,6 @@ void start() } } - char* env_ipv6 = NULL; -#ifdef _MSC_VER - _dupenv_s(&env_ipv6, NULL, "ROS_IPV6"); -#else - env_ipv6 = getenv("ROS_IPV6"); -#endif - - bool use_ipv6 = (env_ipv6 && strcmp(env_ipv6,"on") == 0); - TransportTCP::s_use_ipv6_ = use_ipv6; - XmlRpc::XmlRpcSocket::s_use_ipv6_ = use_ipv6; - #ifdef _MSC_VER if (env_ipv6) { @@ -430,6 +419,19 @@ void start() } } +void check_ipv6_environment() { + char* env_ipv6 = NULL; +#ifdef _MSC_VER + _dupenv_s(&env_ipv6, NULL, "ROS_IPV6"); +#else + env_ipv6 = getenv("ROS_IPV6"); +#endif + + bool use_ipv6 = (env_ipv6 && strcmp(env_ipv6,"on") == 0); + TransportTCP::s_use_ipv6_ = use_ipv6; + XmlRpc::XmlRpcSocket::s_use_ipv6_ = use_ipv6; +} + void init(const M_string& remappings, const std::string& name, uint32_t options) { if (!g_atexit_registered) @@ -453,6 +455,7 @@ void init(const M_string& remappings, const std::string& name, uint32_t options) #ifndef WIN32 signal(SIGPIPE, SIG_IGN); #endif + check_ipv6_environment(); network::init(remappings); master::init(remappings); // names:: namespace is initialized by this_node diff --git a/clients/roscpp/src/libros/io.cpp b/clients/roscpp/src/libros/io.cpp index 6f0c242865..66c4a9993a 100644 --- a/clients/roscpp/src/libros/io.cpp +++ b/clients/roscpp/src/libros/io.cpp @@ -35,6 +35,8 @@ ** Includes *****************************************************************************/ +#include "config.h" + #include #include // don't need if we dont call the pipe functions. #include // for EFAULT and co. @@ -46,6 +48,16 @@ #include // for non-blocking configuration #endif +#ifdef HAVE_EPOLL + #include +#endif + +/***************************************************************************** +** Macros +*****************************************************************************/ + +#define UNUSED(expr) do { (void)(expr); } while (0) + /***************************************************************************** ** Namespaces *****************************************************************************/ @@ -87,6 +99,69 @@ bool last_socket_error_is_would_block() { #endif } +int create_socket_watcher() { + int epfd = -1; +#if defined(HAVE_EPOLL) + epfd = ::epoll_create1(0); + if (epfd < 0) + { + ROS_ERROR("Unable to create epoll watcher: %s", strerror(errno)); + } +#endif + return epfd; +} + +void close_socket_watcher(int fd) { + if (fd >= 0) + ::close(fd); +} + +void add_socket_to_watcher(int epfd, int fd) { +#if defined(HAVE_EPOLL) + struct epoll_event ev; + ev.events = 0; + ev.data.fd = fd; + + if (::epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev)) + { + ROS_ERROR("Unable to add FD to epoll: %s", strerror(errno)); + } +#else + UNUSED(epfd); + UNUSED(fd); +#endif +} + +void del_socket_from_watcher(int epfd, int fd) { +#if defined(HAVE_EPOLL) + if (::epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL)) + { + ROS_ERROR("Unable to remove FD to epoll: %s", strerror(errno)); + } +#else + UNUSED(epfd); + UNUSED(fd); +#endif +} + +void set_events_on_socket(int epfd, int fd, int events) { +#if defined(HAVE_EPOLL) + struct epoll_event ev; + ev.events = events; + ev.data.fd = fd; + if (::epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev)) + { + ROS_ERROR("Unable to modify FD epoll: %s", strerror(errno)); + } +#else + UNUSED(epfd); + UNUSED(fd); + UNUSED(events); +#endif +} + + + /***************************************************************************** ** Service Robotics/Libssh Functions *****************************************************************************/ @@ -96,22 +171,26 @@ bool last_socket_error_is_would_block() { * Windows doesn't have a polling function until Vista (WSAPoll) and even then * its implementation is not supposed to be great. This works for a broader * range of platforms and will suffice. + * @param epfd - the socket watcher to poll on. * @param fds - the socket set (descriptor's and events) to poll for. * @param nfds - the number of descriptors to poll for. * @param timeout - timeout in milliseconds. - * @return int : -1 on error, 0 on timeout, +ve number of structures with received events. + * @return pollfd_vector_ptr : NULL on error, empty on timeout, a list of structures with received events. */ -int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout) { +pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int timeout) { #if defined(WIN32) fd_set readfds, writefds, exceptfds; struct timeval tv, *ptv; socket_fd_t max_fd; int rc; nfds_t i; + boost::shared_ptr > ofds; + + UNUSED(epfd); if (fds == NULL) { errno = EFAULT; - return -1; + return ofds; } FD_ZERO (&readfds); @@ -146,7 +225,7 @@ int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout) { if (rc == -1) { errno = EINVAL; - return -1; + return ofds; } /********************* ** Setting the timeout @@ -166,9 +245,11 @@ int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout) { rc = select (max_fd + 1, &readfds, &writefds, &exceptfds, ptv); if (rc < 0) { - return -1; - } else if ( rc == 0 ) { - return 0; + return ofds; + } + ofds.reset(new std::vector); + if ( rc == 0 ) { + return ofds; } for (rc = 0, i = 0; i < nfds; i++) { @@ -213,18 +294,67 @@ int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout) { } else { fds[i].revents = POLLNVAL; } + ofds->push_back(fds[i]); } - return rc; + return ofds; +#elif defined (HAVE_EPOLL) + UNUSED(nfds); + UNUSED(fds); + struct epoll_event ev[nfds]; + pollfd_vector_ptr ofds; + + int fd_cnt = ::epoll_wait(epfd, ev, nfds, timeout); + + if (fd_cnt < 0) + { + // EINTR means that we got interrupted by a signal, and is not an error + if(errno != EINTR) { + ROS_ERROR("Error in epoll_wait! %s", strerror(errno)); + ofds.reset(); + } + } + else + { + ofds.reset(new std::vector); + for (int i = 0; i < fd_cnt; i++) + { + socket_pollfd pfd; + pfd.fd = ev[i].data.fd; + pfd.revents = ev[i].events; + ofds->push_back(pfd); + } + } + return ofds; #else + UNUSED(epfd); + pollfd_vector_ptr ofds(new std::vector); + // Clear the `revents` fields + for (nfds_t i = 0; i < nfds; i++) + { + fds[i].revents = 0; + } + // use an existing poll implementation int result = poll(fds, nfds, timeout); - if ( result < 0 ) { + if ( result < 0 ) + { // EINTR means that we got interrupted by a signal, and is not an error - if(errno == EINTR) { - result = 0; + if(errno != EINTR) + { + ROS_ERROR("Error in poll! %s", strerror(errno)); + ofds.reset(); + } + } else { + for (nfds_t i = 0; i < nfds; i++) + { + if (fds[i].revents) + { + ofds->push_back(fds[i]); + fds[i].revents = 0; + } } } - return result; + return ofds; #endif // poll_sockets functions } /***************************************************************************** diff --git a/clients/roscpp/src/libros/poll_set.cpp b/clients/roscpp/src/libros/poll_set.cpp index 647e857c85..7ae227d48d 100644 --- a/clients/roscpp/src/libros/poll_set.cpp +++ b/clients/roscpp/src/libros/poll_set.cpp @@ -47,7 +47,7 @@ namespace ros { PollSet::PollSet() -: sockets_changed_(false) + : sockets_changed_(false), epfd_(create_socket_watcher()) { if ( create_signal_pair(signal_pipe_) != 0 ) { ROS_FATAL("create_signal_pair() failed"); @@ -60,6 +60,7 @@ PollSet::PollSet() PollSet::~PollSet() { close_signal_pair(signal_pipe_); + close_socket_watcher(epfd_); } bool PollSet::addSocket(int fd, const SocketUpdateFunc& update_func, const TransportPtr& transport) @@ -80,6 +81,8 @@ bool PollSet::addSocket(int fd, const SocketUpdateFunc& update_func, const Trans return false; } + add_socket_to_watcher(epfd_, fd); + sockets_changed_ = true; } @@ -106,6 +109,8 @@ bool PollSet::delSocket(int fd) just_deleted_.push_back(fd); } + del_socket_from_watcher(epfd_, fd); + sockets_changed_ = true; signal(); @@ -132,6 +137,8 @@ bool PollSet::addEvents(int sock, int events) it->second.events_ |= events; + set_events_on_socket(epfd_, sock, it->second.events_); + signal(); return true; @@ -152,6 +159,8 @@ bool PollSet::delEvents(int sock, int events) return false; } + set_events_on_socket(epfd_, sock, it->second.events_); + signal(); return true; @@ -177,28 +186,28 @@ void PollSet::update(int poll_timeout) createNativePollset(); // Poll across the sockets we're servicing - int ret; - size_t ufds_count = ufds_.size(); - if((ret = poll_sockets(&ufds_.front(), ufds_count, poll_timeout)) < 0) + boost::shared_ptr > ofds = poll_sockets(epfd_, &ufds_.front(), ufds_.size(), poll_timeout); + if (!ofds) { - ROS_ERROR_STREAM("poll failed with error " << last_socket_error_string()); - } - else if (ret > 0) // ret = 0 implies the poll timed out, nothing to do + ROS_ERROR_STREAM("poll failed with error " << last_socket_error_string()); + } + else { - // We have one or more sockets to service - for(size_t i=0; i::iterator it = ofds->begin() ; it != ofds->end(); ++it) { - if (ufds_[i].revents == 0) - { - continue; - } - + int fd = it->fd; + int revents = it->revents; SocketUpdateFunc func; TransportPtr transport; int events = 0; + + if (revents == 0) + { + continue; + } { boost::mutex::scoped_lock lock(socket_info_mutex_); - M_SocketInfo::iterator it = socket_info_.find(ufds_[i].fd); + M_SocketInfo::iterator it = socket_info_.find(fd); // the socket has been entirely deleted if (it == socket_info_.end()) { @@ -215,7 +224,6 @@ void PollSet::update(int poll_timeout) // If these are registered events for this socket, OR the events are ERR/HUP/NVAL, // call through to the registered function - int revents = ufds_[i].revents; if (func && ((events & revents) || (revents & POLLERR) @@ -231,7 +239,7 @@ void PollSet::update(int poll_timeout) // we ignore the first instance of one of these errors. If it's a real error we'll // hit it again next time through. boost::mutex::scoped_lock lock(just_deleted_mutex_); - if (std::find(just_deleted_.begin(), just_deleted_.end(), ufds_[i].fd) != just_deleted_.end()) + if (std::find(just_deleted_.begin(), just_deleted_.end(), fd) != just_deleted_.end()) { skip = true; } @@ -242,13 +250,12 @@ void PollSet::update(int poll_timeout) func(revents & (events|POLLERR|POLLHUP|POLLNVAL)); } } - - ufds_[i].revents = 0; } - - boost::mutex::scoped_lock lock(just_deleted_mutex_); - just_deleted_.clear(); } + + boost::mutex::scoped_lock lock(just_deleted_mutex_); + just_deleted_.clear(); + } void PollSet::createNativePollset() @@ -272,6 +279,7 @@ void PollSet::createNativePollset() pfd.events = info.events_; pfd.revents = 0; } + sockets_changed_ = false; } void PollSet::onLocalPipeEvents(int events) diff --git a/clients/roscpp/src/libros/spinner.cpp b/clients/roscpp/src/libros/spinner.cpp index 4bae89a236..9b1f8d0a6b 100644 --- a/clients/roscpp/src/libros/spinner.cpp +++ b/clients/roscpp/src/libros/spinner.cpp @@ -35,9 +35,8 @@ namespace { const std::string DEFAULT_ERROR_MESSAGE = - "\nAttempt to spin a callback queue from two spinners, one of them being single-threaded." - "\nThis will probably result in callbacks being executed out-of-order." - "\nIn future this will throw an exception!"; + "Attempt to spin a callback queue from two spinners, one of them being single-threaded." + "\nIn the future this will throw an exception!"; /** class to monitor running single-threaded spinners. * @@ -150,7 +149,7 @@ void SingleThreadedSpinner::spin(CallbackQueue* queue) if (!spinner_monitor.add(queue, true)) { - ROS_ERROR_STREAM("SingleThreadedSpinner: " << DEFAULT_ERROR_MESSAGE); + ROS_ERROR_STREAM("SingleThreadedSpinner: " << DEFAULT_ERROR_MESSAGE + " You might want to use a MultiThreadedSpinner instead."); return; } diff --git a/clients/rospy/src/rospy/msg.py b/clients/rospy/src/rospy/msg.py index c7fd8006c9..27204562fd 100644 --- a/clients/rospy/src/rospy/msg.py +++ b/clients/rospy/src/rospy/msg.py @@ -69,7 +69,7 @@ def __init__(self, *args): def serialize(self, buff): """AnyMsg provides an implementation so that a node can forward messages w/o (de)serialization""" if self._buff is None: - raise rospy.exceptions("AnyMsg is not initialized") + raise rospy.exceptions.ROSException("AnyMsg is not initialized") else: buff.write(self._buff) diff --git a/test/test_roscpp/test/test_poll_set.cpp b/test/test_roscpp/test/test_poll_set.cpp index 046e1b3fd0..61a20088b9 100644 --- a/test/test_roscpp/test/test_poll_set.cpp +++ b/test/test_roscpp/test/test_poll_set.cpp @@ -256,7 +256,16 @@ void delThread(PollSet* ps, SocketHelper* sh, boost::barrier* barrier) ps->delSocket(sh->socket_); } -TEST_F(Poller, addDelMultiThread) +/** + * This test has been disabled. The underlying logic which it tests has three + * different implementations (poll, epoll, Windows), and development of the epoll + * version exposed that the test was validating a buggy aspect of the original + * poll version. To reenable this test, the poll version and the test would both + * have to be updated. + * + * For more discussion, see: https://github.com/ros/ros_comm/pull/1217 + */ +TEST_F(Poller, DISABLED_addDelMultiThread) { for (int i = 0; i < 100; ++i) { diff --git a/test/test_rospy/test/unit/test_rospy_api.py b/test/test_rospy/test/unit/test_rospy_api.py index fe541d8765..1c0fd19131 100644 --- a/test/test_rospy/test/unit/test_rospy_api.py +++ b/test/test_rospy/test/unit/test_rospy_api.py @@ -66,12 +66,13 @@ def test_anymsg(self): except ImportError: from io import StringIO import rospy + import rospy.exceptions #trip wires against AnyMsg API m = rospy.AnyMsg() try: m.serialize(StringIO()) self.fail("AnyMsg should not allow serialization") - except: + except rospy.exceptions.ROSException: pass teststr = 'foostr-%s'%time.time() diff --git a/tools/rosbag/src/record.cpp b/tools/rosbag/src/record.cpp index dda378fe12..b624e4e986 100644 --- a/tools/rosbag/src/record.cpp +++ b/tools/rosbag/src/record.cpp @@ -62,7 +62,7 @@ rosbag::RecorderOptions parseOptions(int argc, char** argv) { ("bz2,j", "use BZ2 compression") ("lz4", "use LZ4 compression") ("split", po::value()->implicit_value(0), "Split the bag file and continue recording when maximum size or maximum duration reached.") - ("max-splits", po::value()->default_value(0), "Keep a maximum of N bag files, when reaching the maximum erase the oldest one to keep a constant number of files.") + ("max-splits", po::value(), "Keep a maximum of N bag files, when reaching the maximum erase the oldest one to keep a constant number of files.") ("topic", po::value< std::vector >(), "topic to record") ("size", po::value(), "The maximum size of the bag to record in MB.") ("duration", po::value(), "Record a bag of maximum duration in seconds, unless 'm', or 'h' is appended.") diff --git a/tools/rosbag/src/recorder.cpp b/tools/rosbag/src/recorder.cpp index a4e995cf11..86cd8cade1 100644 --- a/tools/rosbag/src/recorder.cpp +++ b/tools/rosbag/src/recorder.cpp @@ -108,6 +108,7 @@ RecorderOptions::RecorderOptions() : limit(0), split(false), max_size(0), + max_splits(0), max_duration(-1.0), node(""), min_space(1024 * 1024 * 1024), diff --git a/tools/rosbag/src/rosbag/rosbag_main.py b/tools/rosbag/src/rosbag/rosbag_main.py index c14a002ce0..af99fbff15 100644 --- a/tools/rosbag/src/rosbag/rosbag_main.py +++ b/tools/rosbag/src/rosbag/rosbag_main.py @@ -170,10 +170,13 @@ def info_cmd(argv): except ROSBagUnindexedException as ex: print('ERROR bag unindexed: %s. Run rosbag reindex.' % arg, file=sys.stderr) + sys.exit(1) except ROSBagException as ex: print('ERROR reading %s: %s' % (arg, str(ex)), file=sys.stderr) + sys.exit(1) except IOError as ex: print('ERROR reading %s: %s' % (arg, str(ex)), file=sys.stderr) + sys.exit(1) def handle_topics(option, opt_str, value, parser): @@ -326,7 +329,7 @@ def eval_fn(topic, m, t): inbag = Bag(inbag_filename) except ROSBagUnindexedException as ex: print('ERROR bag unindexed: %s. Run rosbag reindex.' % inbag_filename, file=sys.stderr) - return + sys.exit(1) try: meter = ProgressMeter(outbag_filename, inbag._uncompressed_size) @@ -424,7 +427,7 @@ def fix_cmd(argv): except ROSBagUnindexedException as ex: print('ERROR bag unindexed: %s. Run rosbag reindex.' % inbag_filename, file=sys.stderr) - return + sys.exit(1) if len(migrations) == 0: os.rename(outname, outbag_filename) @@ -441,6 +444,7 @@ def fix_cmd(argv): print('Try running \'rosbag check\' to create the necessary rule files or run \'rosbag fix\' with the \'--force\' option.') os.remove(outname) + sys.exit(1) def check_cmd(argv): parser = optparse.OptionParser(usage='rosbag check BAG [-g RULEFILE] [EXTRARULES1 EXTRARULES2 ...]', description='Determine whether a bag is playable in the current system, or if it can be migrated.') @@ -470,7 +474,7 @@ def check_cmd(argv): Bag(args[0]) except ROSBagUnindexedException as ex: print('ERROR bag unindexed: %s. Run rosbag reindex.' % args[0], file=sys.stderr) - return + sys.exit(1) mm = MessageMigrator(args[1:] + append_rule, not options.noplugins) diff --git a/tools/rosbag_storage/src/view.cpp b/tools/rosbag_storage/src/view.cpp index dbb1e72478..dd9fc4842b 100644 --- a/tools/rosbag_storage/src/view.cpp +++ b/tools/rosbag_storage/src/view.cpp @@ -281,10 +281,11 @@ void View::updateQueries(BagQuery* q) { multiset const& index = j->second; // lower_bound/upper_bound do a binary search to find the appropriate range of Index Entries given our time range - - std::multiset::const_iterator begin = std::lower_bound(index.begin(), index.end(), q->query.getStartTime(), IndexEntryCompare()); - std::multiset::const_iterator end = std::upper_bound(index.begin(), index.end(), q->query.getEndTime(), IndexEntryCompare()); - + IndexEntry start_time_lookup_entry = { q->query.getStartTime(), 0, 0 }; + IndexEntry end_time_lookup_entry = { q->query.getEndTime() , 0, 0 }; + std::multiset::const_iterator begin = index.lower_bound(start_time_lookup_entry); + std::multiset::const_iterator end = index.upper_bound(end_time_lookup_entry); + // Make sure we are at the right beginning while (begin != index.begin() && begin->time >= q->query.getStartTime()) { diff --git a/tools/roslaunch/src/roslaunch/nodeprocess.py b/tools/roslaunch/src/roslaunch/nodeprocess.py index cc86a4ad13..938db09b1a 100644 --- a/tools/roslaunch/src/roslaunch/nodeprocess.py +++ b/tools/roslaunch/src/roslaunch/nodeprocess.py @@ -95,7 +95,7 @@ def create_master_process(run_id, type_, ros_root, port, num_workers=NUM_WORKERS _logger.info("process[master]: launching with args [%s]"%args) log_output = False - return LocalProcess(run_id, package, 'master', args, os.environ, log_output, None) + return LocalProcess(run_id, package, 'master', args, os.environ, log_output, None, required=True) def create_node_process(run_id, node, master_uri): """ diff --git a/tools/roslaunch/src/roslaunch/parent.py b/tools/roslaunch/src/roslaunch/parent.py index ac27b83069..5e5cf34f4a 100644 --- a/tools/roslaunch/src/roslaunch/parent.py +++ b/tools/roslaunch/src/roslaunch/parent.py @@ -286,7 +286,10 @@ def start(self, auto_terminate=True): if self.process_listeners: for l in self.process_listeners: self.runner.pm.add_process_listener(l) - + # Add listeners to server as well, otherwise they won't be + # called when a node on a remote machine dies. + self.server.add_process_listener(l) + def spin_once(self): """ Run the parent roslaunch event loop once diff --git a/tools/rosmaster/src/rosmaster/util.py b/tools/rosmaster/src/rosmaster/util.py index 6fb469d781..ee3a4848a0 100644 --- a/tools/rosmaster/src/rosmaster/util.py +++ b/tools/rosmaster/src/rosmaster/util.py @@ -70,6 +70,8 @@ def xmlrpcapi(uri): def close_half_closed_sockets(): + if not hasattr(socket, 'TCP_INFO'): + return for proxy in _proxies.values(): transport = proxy("transport") if transport._connection and transport._connection[1] is not None and transport._connection[1].sock is not None: diff --git a/tools/rosnode/src/rosnode/__init__.py b/tools/rosnode/src/rosnode/__init__.py index f7bf80ad61..4c8322902f 100644 --- a/tools/rosnode/src/rosnode/__init__.py +++ b/tools/rosnode/src/rosnode/__init__.py @@ -84,7 +84,7 @@ def _succeed(args): _caller_apis = {} def get_api_uri(master, caller_id, skip_cache=False): """ - @param master: XMLRPC handle to ROS Master + @param master: rosgraph Master instance @type master: rosgraph.Master @param caller_id: node name @type caller_id: str @@ -399,9 +399,9 @@ def rosnode_ping_all(verbose=False): def cleanup_master_blacklist(master, blacklist): """ - Remove registrations from ROS Master that do not match blacklist. - @param master: XMLRPC handle to ROS Master - @type master: xmlrpclib.ServerProxy + Remove registrations from ROS Master that match blacklist. + @param master: rosgraph Master instance + @type master: rosgraph.Master @param blacklist: list of nodes to scrub @type blacklist: [str] """ @@ -426,8 +426,8 @@ def cleanup_master_blacklist(master, blacklist): def cleanup_master_whitelist(master, whitelist): """ Remove registrations from ROS Master that do not match whitelist. - @param master: XMLRPC handle to ROS Master - @type master: xmlrpclib.ServerProxy + @param master: rosgraph Master instance + @type master: rosgraph.Master @param whitelist: list of nodes to keep @type whitelist: list of nodes to keep """ diff --git a/tools/rostest/cmake/rostest-extras.cmake.em b/tools/rostest/cmake/rostest-extras.cmake.em index 324f3b5f3d..7cd908804d 100644 --- a/tools/rostest/cmake/rostest-extras.cmake.em +++ b/tools/rostest/cmake/rostest-extras.cmake.em @@ -57,12 +57,41 @@ function(add_rostest file) catkin_run_tests_target("rostest" ${_testname} "rostest-${_output_name}" COMMAND ${cmd} WORKING_DIRECTORY ${_rostest_WORKING_DIRECTORY} DEPENDENCIES ${_rostest_DEPENDENCIES}) endfunction() +# This is an internal function, use add_rostest_gtest or +# add_rostest_gmock instead +# +# :param type: "gtest" or "gmock" +# The remaining arguments are the same as for add_rostest_gtest +# and add_rostest_gmock +# +function(_add_rostest_google_test type target launch_file) + if (NOT "${type}" STREQUAL "gtest" AND NOT "${type}" STREQUAL "gmock") + message(FATAL_ERROR + "Invalid use of _add_rostest_google_test function, " + "first argument must be 'gtest' or 'gmock'") + return() + endif() + string(TOUPPER "${type}" type_upper) + if("${ARGN}" STREQUAL "") + message(FATAL_ERROR "add_rostest_${type}() needs at least one file argument to compile a ${type_upper} executable") + endif() + if(${type_upper}_FOUND) + include_directories(${${type_upper}_INCLUDE_DIRS}) + add_executable(${target} EXCLUDE_FROM_ALL ${ARGN}) + target_link_libraries(${target} ${${type_upper}_LIBRARIES}) + if(TARGET tests) + add_dependencies(tests ${target}) + endif() + add_rostest(${launch_file} DEPENDENCIES ${target}) + endif() +endfunction() + # # Register the launch file with add_rostest() and compile all # passed files into a GTest binary. # # .. note:: The function does nothing if GTest was not found. The -# target is only compiled when tests are build and linked against +# target is only compiled when tests are built and linked against # the GTest libraries. # # :param target: target name of the GTest executable @@ -73,18 +102,26 @@ endfunction() # :type ARGN: list of files # function(add_rostest_gtest target launch_file) - if("${ARGN}" STREQUAL "") - message(FATAL_ERROR "add_rostest_gtest() needs at least one file argument to compile a GTest executable") - endif() - if(GTEST_FOUND) - include_directories(${GTEST_INCLUDE_DIRS}) - add_executable(${target} EXCLUDE_FROM_ALL ${ARGN}) - target_link_libraries(${target} ${GTEST_LIBRARIES}) - if(TARGET tests) - add_dependencies(tests ${target}) - endif() - add_rostest(${launch_file} DEPENDENCIES ${target}) - endif() + _add_rostest_google_test("gtest" ${target} ${launch_file} ${ARGN}) +endfunction() + +# +# Register the launch file with add_rostest() and compile all +# passed files into a GMock binary. +# +# .. note:: The function does nothing if GMock was not found. The +# target is only compiled when tests are built and linked against +# the GMock libraries. +# +# :param target: target name of the GMock executable +# :type target: string +# :param launch_file: the relative path to the roslaunch file +# :type launch_file: string +# :param ARGN: the files to compile into a GMock executable +# :type ARGN: list of files +# +function(add_rostest_gmock target launch_file) + _add_rostest_google_test("gmock" ${target} ${launch_file} ${ARGN}) endfunction() macro(rostest__strip_prefix var prefix) diff --git a/tools/rostest/package.xml b/tools/rostest/package.xml index 6aa96dd3c6..e4b76f8fdf 100644 --- a/tools/rostest/package.xml +++ b/tools/rostest/package.xml @@ -10,7 +10,7 @@ http://ros.org/wiki/rostest Ken Conley - catkin + catkin boost rosunit diff --git a/tools/topic_tools/test/test_mux_services.py b/tools/topic_tools/test/test_mux_services.py index dd8d6c2603..6875259e0f 100755 --- a/tools/topic_tools/test/test_mux_services.py +++ b/tools/topic_tools/test/test_mux_services.py @@ -52,7 +52,7 @@ def make_srv_proxies(self): rospy.wait_for_service('mux/list', 5) rospy.wait_for_service('mux/select', 5) except rospy.ROSException as e: - self.fail('failed to find a required service: ' + `e`) + self.fail('failed to find a required service: ' + repr(e)) add_srv = rospy.ServiceProxy('mux/add', MuxAdd) delete_srv = rospy.ServiceProxy('mux/delete', MuxDelete) diff --git a/utilities/roslz4/src/lz4s.c b/utilities/roslz4/src/lz4s.c index 05f16eabfc..8db1dfc9e1 100644 --- a/utilities/roslz4/src/lz4s.c +++ b/utilities/roslz4/src/lz4s.c @@ -573,7 +573,10 @@ int roslz4_buffToBuffCompress(char *input, unsigned int input_size, int ret; ret = roslz4_compressStart(&stream, block_size_id); - if (ret != ROSLZ4_OK) { return ret; } + if (ret != ROSLZ4_OK) { + roslz4_compressEnd(&stream); + return ret; + } while (stream.input_left > 0 && ret != ROSLZ4_STREAM_END) { ret = roslz4_compress(&stream, ROSLZ4_FINISH); diff --git a/utilities/xmlrpcpp/CMakeLists.txt b/utilities/xmlrpcpp/CMakeLists.txt index b69969b321..0304abefd8 100644 --- a/utilities/xmlrpcpp/CMakeLists.txt +++ b/utilities/xmlrpcpp/CMakeLists.txt @@ -5,13 +5,13 @@ if(NOT WIN32) set_directory_properties(PROPERTIES COMPILE_OPTIONS "-Wall;-Wextra") endif() -find_package(catkin REQUIRED COMPONENTS cpp_common) +find_package(catkin REQUIRED COMPONENTS cpp_common rostime) # The CFG_EXTRAS is only for compatibility, to be removed in Lunar. catkin_package( INCLUDE_DIRS include LIBRARIES xmlrpcpp - CATKIN_DEPENDS cpp_common + CATKIN_DEPENDS cpp_common rostime CFG_EXTRAS xmlrpcpp-extras.cmake ) @@ -34,6 +34,7 @@ add_library(xmlrpcpp src/XmlRpcValue.cpp ) +target_link_libraries(xmlrpcpp ${catkin_LIBRARIES}) if(WIN32) target_link_libraries(xmlrpcpp ws2_32) endif() diff --git a/utilities/xmlrpcpp/cmake/xmlrpcpp-extras.cmake.develspace.in b/utilities/xmlrpcpp/cmake/xmlrpcpp-extras.cmake.develspace.in new file mode 100644 index 0000000000..f5ea2cf960 --- /dev/null +++ b/utilities/xmlrpcpp/cmake/xmlrpcpp-extras.cmake.develspace.in @@ -0,0 +1 @@ +list(APPEND @PROJECT_NAME@_INCLUDE_DIRS "@xmlrpcpp_SOURCE_DIR@/include/xmlrpcpp") diff --git a/utilities/xmlrpcpp/cmake/xmlrpcpp-extras.cmake.in b/utilities/xmlrpcpp/cmake/xmlrpcpp-extras.cmake.installspace.in similarity index 100% rename from utilities/xmlrpcpp/cmake/xmlrpcpp-extras.cmake.in rename to utilities/xmlrpcpp/cmake/xmlrpcpp-extras.cmake.installspace.in diff --git a/utilities/xmlrpcpp/include/xmlrpcpp/XmlRpcDispatch.h b/utilities/xmlrpcpp/include/xmlrpcpp/XmlRpcDispatch.h index 5e908228c4..f5dba9c8d2 100644 --- a/utilities/xmlrpcpp/include/xmlrpcpp/XmlRpcDispatch.h +++ b/utilities/xmlrpcpp/include/xmlrpcpp/XmlRpcDispatch.h @@ -58,7 +58,7 @@ namespace XmlRpc { //! Clear all sources from the monitored sources list. Sources are closed. void clear(); - // helper + // helper returning current steady/monotonic time double getTime(); // A source to monitor and what to monitor it for diff --git a/utilities/xmlrpcpp/package.xml b/utilities/xmlrpcpp/package.xml index 160f0e35e8..a385612773 100644 --- a/utilities/xmlrpcpp/package.xml +++ b/utilities/xmlrpcpp/package.xml @@ -1,4 +1,4 @@ - + xmlrpcpp 1.12.12 @@ -17,9 +17,8 @@ catkin - cpp_common - - cpp_common + cpp_common + rostime diff --git a/utilities/xmlrpcpp/src/XmlRpcDispatch.cpp b/utilities/xmlrpcpp/src/XmlRpcDispatch.cpp index fd171adbc0..d78c770362 100644 --- a/utilities/xmlrpcpp/src/XmlRpcDispatch.cpp +++ b/utilities/xmlrpcpp/src/XmlRpcDispatch.cpp @@ -3,6 +3,8 @@ #include "xmlrpcpp/XmlRpcSource.h" #include "xmlrpcpp/XmlRpcUtil.h" +#include "ros/time.h" + #include #include #include @@ -62,7 +64,7 @@ XmlRpcDispatch::removeSource(XmlRpcSource* source) // Modify the types of events to watch for on this source -void +void XmlRpcDispatch::setSourceEvents(XmlRpcSource* source, unsigned eventMask) { for (SourceList::iterator it=_sources.begin(); it!=_sources.end(); ++it) @@ -219,11 +221,10 @@ XmlRpcDispatch::getTime() return ((double) tbuff.time + ((double)tbuff.millitm / 1000.0) + ((double) tbuff.timezone * 60)); #else - struct timeval tv; - struct timezone tz; + uint32_t sec, nsec; - gettimeofday(&tv, &tz); - return (tv.tv_sec + tv.tv_usec / 1000000.0); + ros::ros_steadytime(sec, nsec); + return ((double)sec + (double)nsec / 1e9); #endif /* USE_FTIME */ }