diff --git a/clients/roscpp/src/libros/callback_queue.cpp b/clients/roscpp/src/libros/callback_queue.cpp index c38fcbde95..cef18e11d7 100644 --- a/clients/roscpp/src/libros/callback_queue.cpp +++ b/clients/roscpp/src/libros/callback_queue.cpp @@ -124,7 +124,10 @@ void CallbackQueue::addCallback(const CallbackInterfacePtr& callback, uint64_t r callbacks_.push_back(info); } - condition_.notify_one(); + if (callback->ready()) + { + condition_.notify_one(); + } } CallbackQueue::IDInfoPtr CallbackQueue::getIDInfo(uint64_t id) @@ -225,6 +228,8 @@ CallbackQueue::CallOneResult CallbackQueue::callOne(ros::WallDuration timeout) return Disabled; } + ros::SteadyTime start_time(ros::SteadyTime::now()); + if (callbacks_.empty()) { if (!timeout.isZero()) @@ -266,6 +271,17 @@ CallbackQueue::CallOneResult CallbackQueue::callOne(ros::WallDuration timeout) if (!cb_info.callback) { + // do not spend more than `timeout` seconds in the callback; we already waited for some time when waiting for + // nonempty queue + ros::SteadyTime now(ros::SteadyTime::now()); + ros::WallDuration time_spent = now - start_time; + ros::WallDuration time_to_wait = timeout - time_spent; + + if (time_to_wait.toNSec() > 0) + { + condition_.wait_for(lock, boost::chrono::nanoseconds(time_to_wait.toNSec())); + } + return TryAgain; } @@ -391,6 +407,10 @@ CallbackQueue::CallOneResult CallbackQueue::callOneCB(TLS* tls) { tls->cb_it = tls->callbacks.erase(tls->cb_it); result = cb->call(); + if (result == CallbackInterface::Success) + { + condition_.notify_one(); + } } } diff --git a/clients/roscpp/src/libros/subscription_queue.cpp b/clients/roscpp/src/libros/subscription_queue.cpp index 67b325506b..5f1117a5c2 100644 --- a/clients/roscpp/src/libros/subscription_queue.cpp +++ b/clients/roscpp/src/libros/subscription_queue.cpp @@ -169,7 +169,12 @@ CallbackInterface::CallResult SubscriptionQueue::call() bool SubscriptionQueue::ready() { - return true; + if (allow_concurrent_callbacks_) + { + return true; + } + boost::recursive_mutex::scoped_try_lock lock(callback_mutex_, boost::try_to_lock); + return lock.owns_lock(); } bool SubscriptionQueue::full() diff --git a/test/test_roscpp/test/fake_message.h b/test/test_roscpp/test/fake_message.h new file mode 100644 index 0000000000..3e58767125 --- /dev/null +++ b/test/test_roscpp/test/fake_message.h @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2009, Willow Garage, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Willow Garage, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/* Author: Josh Faust */ + +/* + * Subscription queue test helper classes + */ +#ifndef TEST_ROSCPP_FAKE_MESSAGE_H +#define TEST_ROSCPP_FAKE_MESSAGE_H + +#include "ros/subscription_callback_helper.h" + +class FakeMessage +{ +public: + virtual const std::string __getDataType() const { return ""; } + virtual const std::string __getMD5Sum() const { return ""; } + virtual const std::string __getMessageDefinition() const { return ""; } + virtual uint32_t serializationLength() const { return 0; } + virtual uint8_t *serialize(uint8_t *write_ptr, uint32_t seq) const { (void)seq; return write_ptr; } + virtual uint8_t *deserialize(uint8_t *read_ptr) { return read_ptr; } +}; + +class FakeSubHelper : public ros::SubscriptionCallbackHelper +{ +public: + FakeSubHelper() + : calls_(0) + {} + + virtual ros::VoidConstPtr deserialize(const ros::SubscriptionCallbackHelperDeserializeParams&) + { + return boost::make_shared(); + } + + virtual std::string getMD5Sum() { return ""; } + virtual std::string getDataType() { return ""; } + + virtual void call(ros::SubscriptionCallbackHelperCallParams& params) + { + (void)params; + { + boost::mutex::scoped_lock lock(mutex_); + ++calls_; + } + + if (cb_) + { + cb_(); + } + } + + virtual const std::type_info& getTypeInfo() { return typeid(FakeMessage); } + virtual bool isConst() { return true; } + virtual bool hasHeader() { return false; } + + boost::mutex mutex_; + uint32_t calls_; + + boost::function cb_; +}; +typedef boost::shared_ptr FakeSubHelperPtr; + +#endif // TEST_ROSCPP_FAKE_MESSAGE_H diff --git a/test/test_roscpp/test/test_callback_queue.cpp b/test/test_roscpp/test/test_callback_queue.cpp index b1c51a79eb..648fb6ac81 100644 --- a/test/test_roscpp/test/test_callback_queue.cpp +++ b/test/test_roscpp/test/test_callback_queue.cpp @@ -36,6 +36,9 @@ #include #include #include +#include +#include +#include #include #include @@ -44,6 +47,8 @@ #include #include +#include "fake_message.h" + using namespace ros; class CountingCallback : public CallbackInterface @@ -258,31 +263,51 @@ TEST(CallbackQueue, recursive4) EXPECT_EQ(cb->count, 3U); } -void callAvailableThread(CallbackQueue* queue, bool& done) +void callAvailableThread( + CallbackQueue* queue, bool& done, boost::atomic* num_calls, + ros::WallDuration call_timeout = ros::WallDuration(0.1)) { + size_t i = 0; while (!done) { - queue->callAvailable(ros::WallDuration(0.1)); + queue->callAvailable(call_timeout); + ++i; + } + + if (num_calls) + { + num_calls->fetch_add(i); } } -size_t runThreadedTest(const CountingCallbackPtr& cb, const boost::function& threadFunc) +size_t runThreadedTest( + const CallbackInterfacePtr& cb, + const boost::function*, ros::WallDuration)>& threadFunc, + size_t* num_calls = NULL, size_t num_threads = 10, + ros::WallDuration duration = ros::WallDuration(5), + ros::WallDuration pause_between_callbacks = ros::WallDuration(0), + ros::WallDuration call_one_timeout = ros::WallDuration(0.1)) { CallbackQueue queue; boost::thread_group tg; bool done = false; + boost::atomic calls(0); - for (uint32_t i = 0; i < 10; ++i) + for (uint32_t i = 0; i < num_threads; ++i) { - tg.create_thread(boost::bind(threadFunc, &queue, boost::ref(done))); + tg.create_thread(boost::bind(threadFunc, &queue, boost::ref(done), &calls, call_one_timeout)); } ros::WallTime start = ros::WallTime::now(); size_t i = 0; - while (ros::WallTime::now() - start < ros::WallDuration(5)) + while (ros::WallTime::now() - start < duration) { queue.addCallback(cb); ++i; + if (!pause_between_callbacks.isZero()) + { + pause_between_callbacks.sleep(); + } } while (!queue.isEmpty()) @@ -293,6 +318,9 @@ size_t runThreadedTest(const CountingCallbackPtr& cb, const boost::functioncount, i); } -void callOneThread(CallbackQueue* queue, bool& done) +void callOneThread( + CallbackQueue* queue, bool& done, boost::atomic* num_calls, + ros::WallDuration timeout = ros::WallDuration(0.1)) { + size_t i = 0; while (!done) { - queue->callOne(ros::WallDuration(0.1)); + queue->callOne(timeout); + ++i; + } + + if (num_calls) + { + num_calls->fetch_add(i); } } @@ -320,6 +357,112 @@ TEST(CallbackQueue, threadedCallOne) EXPECT_EQ(cb->count, i); } +class CountingSubscriptionQueue : public SubscriptionQueue +{ +public: + CountingSubscriptionQueue( + const std::string& topic, int32_t queue_size, + bool allow_concurrent_callbacks) + : SubscriptionQueue(topic, queue_size, allow_concurrent_callbacks), + ready_count(0) + {} + + virtual bool ready() + { + ready_count.fetch_add(1); + return SubscriptionQueue::ready(); + } + + boost::atomic ready_count; +}; +typedef boost::shared_ptr CountingSubscriptionQueuePtr; + +struct ThreadedCallOneSlowParams { + ros::WallDuration callback_duration; // long-lasting callback + size_t num_threads; + ros::WallDuration call_one_timeout; + ros::WallDuration test_duration; + ros::WallDuration pause_between_callbacks; +}; + +class CallbackQueueParamTest : public ::testing::TestWithParam +{}; + +TEST_P(CallbackQueueParamTest, threadedCallOneSlow) +{ + // test for https://github.com/ros/ros_comm/issues/1545 + // "roscpp multithreaded spinners eat up CPU when callbacks take too long" + + const ThreadedCallOneSlowParams param = GetParam(); + const WallDuration& callback_duration = param.callback_duration; // long-lasting callback + const size_t num_threads = param.num_threads; + const ros::WallDuration call_one_timeout = param.call_one_timeout; + const ros::WallDuration test_duration = param.test_duration; + const ros::WallDuration pause_between_callbacks = param.pause_between_callbacks; + // queue_size is chosen such that it is larger than the max number of callbacks we + // are really able to process in 5 secs (since allow_concurrent_callbacks is false, + // it is equal to the number of seconds the queue is running) + const size_t queue_size = static_cast(test_duration.toSec()) + 1; + + // create a subscription queue counting the number of ready() calls + const CountingSubscriptionQueuePtr cb( + boost::make_shared("test", queue_size, false)); + + // create a slow subscription callback (takes 1 second to complete) + const FakeSubHelperPtr helper(boost::make_shared()); + helper->cb_ = boost::bind(&ros::WallDuration::sleep, callback_duration); + const MessageDeserializerPtr des(boost::make_shared( + helper, SerializedMessage(), boost::shared_ptr())); + + // fill the subscription queue to get max performance + for (size_t i = 0; i < queue_size; ++i) + { + cb->push(helper, des, false, VoidConstWPtr(), true); + } + + // keep filling the callback queue at maximum speed for 5 seconds and + // spin up 10 processing threads until the queue is empty + size_t num_call_one_calls = 0; + const size_t num_callbacks_to_call = runThreadedTest( + cb, callOneThread, &num_call_one_calls, + num_threads, test_duration, pause_between_callbacks, call_one_timeout); + + const uint32_t num_callbacks_called = helper->calls_; + const size_t num_ready_called = cb->ready_count; + + // what should happen: even though we have multiple processing threads, + // the subscription queue has a per-topic lock which prevents multiple threads from + // processing the topic's callbacks simultaneously; so even though there were + // tens of thousands of callbacks in the callback queue, we only got time to process + // less than 5 of them because queue_size is quite limited (3), so most callbacks + // get thrown away during processing of the slow callbacks + + // we test the number of SubscriptionQueue::ready() calls to see how often do the + // idle threads ask for more work; with bug 1545 unfixed, this gets to millions of + // calls which acts as a busy-wait; with the bug fixed, the number should not be + // higher than number of callbacks (~ 80k), since each newly added callback should + // wake one idle thread and let it ask for work + + ROS_INFO_STREAM("Callback queue processed " << + num_callbacks_called << " callbacks out of " << num_callbacks_to_call); + ROS_INFO_STREAM("callOne() was called " << num_call_one_calls << " times."); + ROS_INFO_STREAM("ready() was called " << num_ready_called << " times."); + + EXPECT_EQ(num_callbacks_called, queue_size); + EXPECT_LE(num_call_one_calls, 2 * num_callbacks_to_call + num_threads * (1/call_one_timeout.toSec()) * queue_size); +} + +INSTANTIATE_TEST_CASE_P(slow, CallbackQueueParamTest, ::testing::Values( + //ThreadedCallOneSlowParams{callback_duration, num_threads, call_one_timeout, test_duration, pause_between_callbacks} + ThreadedCallOneSlowParams{ros::WallDuration(1.0), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)}, + ThreadedCallOneSlowParams{ros::WallDuration(1.0), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0.1)}, + ThreadedCallOneSlowParams{ros::WallDuration(1.0), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0.001)}, + ThreadedCallOneSlowParams{ros::WallDuration(0.1), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)}, + ThreadedCallOneSlowParams{ros::WallDuration(0.001), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)}, + ThreadedCallOneSlowParams{ros::WallDuration(1.0), 1, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)}, + ThreadedCallOneSlowParams{ros::WallDuration(1.0), 2, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)} + )); + // this class is just an ugly hack // to access the constructor Timer(TimerOptions) namespace ros @@ -384,10 +527,11 @@ TEST(CallbackQueue, recursiveTimer) boost::thread_group tg; bool done = false; + boost::atomic calls(0); for (uint32_t i = 0; i < 2; ++i) { - tg.create_thread(boost::bind(callOneThread, &queue, boost::ref(done))); + tg.create_thread(boost::bind(callOneThread, &queue, boost::ref(done), &calls, ros::WallDuration(0.1))); } while (!queue.isEmpty()) diff --git a/test/test_roscpp/test/test_subscription_queue.cpp b/test/test_roscpp/test/test_subscription_queue.cpp index 9af94a2c40..a19cccd41f 100644 --- a/test/test_roscpp/test/test_subscription_queue.cpp +++ b/test/test_roscpp/test/test_subscription_queue.cpp @@ -44,58 +44,9 @@ #include #include -using namespace ros; - -class FakeMessage -{ -public: - virtual const std::string __getDataType() const { return ""; } - virtual const std::string __getMD5Sum() const { return ""; } - virtual const std::string __getMessageDefinition() const { return ""; } - virtual uint32_t serializationLength() const { return 0; } - virtual uint8_t *serialize(uint8_t *write_ptr, uint32_t seq) const { (void)seq; return write_ptr; } - virtual uint8_t *deserialize(uint8_t *read_ptr) { return read_ptr; } -}; - -class FakeSubHelper : public SubscriptionCallbackHelper -{ -public: - FakeSubHelper() - : calls_(0) - {} - - virtual VoidConstPtr deserialize(const SubscriptionCallbackHelperDeserializeParams&) - { - return boost::make_shared(); - } +#include "fake_message.h" - virtual std::string getMD5Sum() { return ""; } - virtual std::string getDataType() { return ""; } - - virtual void call(SubscriptionCallbackHelperCallParams& params) - { - (void)params; - { - boost::mutex::scoped_lock lock(mutex_); - ++calls_; - } - - if (cb_) - { - cb_(); - } - } - - virtual const std::type_info& getTypeInfo() { return typeid(FakeMessage); } - virtual bool isConst() { return true; } - virtual bool hasHeader() { return false; } - - boost::mutex mutex_; - int32_t calls_; - - boost::function cb_; -}; -typedef boost::shared_ptr FakeSubHelperPtr; +using namespace ros; TEST(SubscriptionQueue, queueSize) { @@ -127,7 +78,7 @@ TEST(SubscriptionQueue, queueSize) ASSERT_EQ(queue.call(), CallbackInterface::Success); ASSERT_EQ(queue.call(), CallbackInterface::Invalid); - ASSERT_EQ(helper->calls_, 2); + ASSERT_EQ(helper->calls_, 2u); } TEST(SubscriptionQueue, infiniteQueue) @@ -158,7 +109,7 @@ TEST(SubscriptionQueue, infiniteQueue) ASSERT_EQ(queue.call(), CallbackInterface::Invalid); - ASSERT_EQ(helper->calls_, 10001); + ASSERT_EQ(helper->calls_, 10001u); } TEST(SubscriptionQueue, clearCall) @@ -254,7 +205,7 @@ TEST(SubscriptionQueue, concurrentCallbacks) t1.join(); t2.join(); - ASSERT_EQ(helper->calls_, 2); + ASSERT_EQ(helper->calls_, 2u); } void waitForASecond() @@ -276,9 +227,9 @@ TEST(SubscriptionQueue, nonConcurrentOrdering) t1.join(); t2.join(); - ASSERT_EQ(helper->calls_, 1); + ASSERT_EQ(helper->calls_, 1u); queue.call(); - ASSERT_EQ(helper->calls_, 2); + ASSERT_EQ(helper->calls_, 2u); } int main(int argc, char** argv)