diff --git a/clients/roscpp/include/ros/statistics.h b/clients/roscpp/include/ros/statistics.h index 738fc4f437..5a5a86cea6 100644 --- a/clients/roscpp/include/ros/statistics.h +++ b/clients/roscpp/include/ros/statistics.h @@ -67,11 +67,13 @@ class ROSCPP_DECL StatisticsLogger private: - // these are hard constrains + // Range of window length, in seconds int max_window; int min_window; - // these are soft constrains + // Range of acceptable messages in window. + // Window size will be adjusted if number of observed is + // outside this range. int max_elements; int min_elements; diff --git a/clients/roscpp/src/libros/statistics.cpp b/clients/roscpp/src/libros/statistics.cpp index f5bb7e961c..e96b7279a0 100644 --- a/clients/roscpp/src/libros/statistics.cpp +++ b/clients/roscpp/src/libros/statistics.cpp @@ -113,7 +113,8 @@ void StatisticsLogger::callback(const boost::shared_ptr& connection_he } // should publish new statistics? - if (stats.last_publish + ros::Duration(1 / pub_frequency_) < received_time) + double pub_period = 1.0 / pub_frequency_; + if (stats.last_publish + ros::Duration(pub_period) < received_time) { ros::Time window_start = stats.last_publish; stats.last_publish = received_time; @@ -236,11 +237,11 @@ void StatisticsLogger::callback(const boost::shared_ptr& connection_he pub_.publish(msg); // dynamic window resizing - if (stats.arrival_time_list.size() > static_cast(max_elements) && pub_frequency_ * 2 <= max_window) + if (stats.arrival_time_list.size() > static_cast(max_elements) && pub_period / 2.0 >= min_window) { pub_frequency_ *= 2; } - if (stats.arrival_time_list.size() < static_cast(min_elements) && pub_frequency_ / 2 >= min_window) + if (stats.arrival_time_list.size() < static_cast(min_elements) && pub_period * 2.0 <= max_window) { pub_frequency_ /= 2; } diff --git a/clients/rospy/src/rospy/impl/statistics.py b/clients/rospy/src/rospy/impl/statistics.py index 60855b9263..6db8b0024b 100644 --- a/clients/rospy/src/rospy/impl/statistics.py +++ b/clients/rospy/src/rospy/impl/statistics.py @@ -68,15 +68,15 @@ def read_parameters(self): Fetch window parameters from parameter server """ - # Range of window length, in seconds - self.min_elements = rospy.get_param("/statistics_window_min_elements", 10) - self.max_elements = rospy.get_param("/statistics_window_max_elements", 100) - # Range of acceptable messages in window. # Window size will be adjusted if number of observed is # outside this range. - self.max_window = rospy.get_param("/statistics_window_max_size", 64) + self.min_elements = rospy.get_param("/statistics_window_min_elements", 10) + self.max_elements = rospy.get_param("/statistics_window_max_elements", 100) + + # Range of window length, in seconds self.min_window = rospy.get_param("/statistics_window_min_size", 4) + self.max_window = rospy.get_param("/statistics_window_max_size", 64) def callback(self, msg, publisher, stat_bytes): """ @@ -208,9 +208,10 @@ def sendStatistics(self, subscriber_statistics_logger): self.pub.publish(msg) # adjust window, if message count is not appropriate. - if len(self.arrival_time_list_) > subscriber_statistics_logger.max_elements and self.pub_frequency.to_sec() * 2 <= subscriber_statistics_logger.max_window: + pub_period = 1.0 / self.pub_frequency.to_sec() + if len(self.arrival_time_list_) > subscriber_statistics_logger.max_elements and pub_period / 2 >= subscriber_statistics_logger.min_window: self.pub_frequency *= 2 - if len(self.arrival_time_list_) < subscriber_statistics_logger.min_elements and self.pub_frequency.to_sec() / 2 >= subscriber_statistics_logger.min_window: + if len(self.arrival_time_list_) < subscriber_statistics_logger.min_elements and pub_period * 2 <= subscriber_statistics_logger.max_window: self.pub_frequency /= 2 # clear collected stats, start new window. @@ -257,7 +258,7 @@ def callback(self, subscriber_statistics_logger, msg, stat_bytes): self.last_seq_ = msg.header.seq # send out statistics with a certain frequency - if self.last_pub_time + self.pub_frequency < arrival_time: + if self.last_pub_time + rospy.Duration(1.0 / self.pub_frequency.to_sec()) < arrival_time: self.last_pub_time = arrival_time self.sendStatistics(subscriber_statistics_logger) diff --git a/test/test_roscpp/test/CMakeLists.txt b/test/test_roscpp/test/CMakeLists.txt index 271738bd66..538daa5131 100644 --- a/test/test_roscpp/test/CMakeLists.txt +++ b/test/test_roscpp/test/CMakeLists.txt @@ -149,6 +149,7 @@ add_rostest(launch/ns_node_remapping.xml) add_rostest(launch/search_param.xml) add_rostest(launch/stamped_topic_statistics_with_empty_timestamp.xml) +add_rostest(launch/topic_statistic_frequency.xml DEPENDENCIES ${PROJECT_NAME}-publisher_rate ${PROJECT_NAME}-subscriber ${PROJECT_NAME}-topic_statistic_frequency) # Test spinners add_rostest(launch/spinners.xml) diff --git a/test/test_roscpp/test/launch/topic_statistic_frequency.xml b/test/test_roscpp/test/launch/topic_statistic_frequency.xml new file mode 100644 index 0000000000..ef0d3f0064 --- /dev/null +++ b/test/test_roscpp/test/launch/topic_statistic_frequency.xml @@ -0,0 +1,39 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/test/test_roscpp/test/src/CMakeLists.txt b/test/test_roscpp/test/src/CMakeLists.txt index 332f49be74..41c8e6329f 100644 --- a/test/test_roscpp/test/src/CMakeLists.txt +++ b/test/test_roscpp/test/src/CMakeLists.txt @@ -205,13 +205,20 @@ add_dependencies(${PROJECT_NAME}-string_msg_expect ${std_msgs_EXPORTED_TARGETS}) add_executable(${PROJECT_NAME}-stamped_topic_statistics_empty_timestamp EXCLUDE_FROM_ALL stamped_topic_statistics_empty_timestamp.cpp) target_link_libraries(${PROJECT_NAME}-stamped_topic_statistics_empty_timestamp ${GTEST_LIBRARIES} ${catkin_LIBRARIES}) -# The publisher and subscriber are compiled but not registered as a unit test +add_executable(${PROJECT_NAME}-topic_statistic_frequency EXCLUDE_FROM_ALL topic_statistic_frequency.cpp) +target_link_libraries(${PROJECT_NAME}-topic_statistic_frequency ${GTEST_LIBRARIES} ${catkin_LIBRARIES}) + +# The publishers and subscriber are compiled but not registered as a unit test # since the test execution requires a network connection which drops packages. # Call scripts/test_udp_with_dropped_packets.sh to run the test. add_executable(${PROJECT_NAME}-publisher EXCLUDE_FROM_ALL publisher.cpp) target_link_libraries(${PROJECT_NAME}-publisher ${catkin_LIBRARIES}) add_dependencies(${PROJECT_NAME}-publisher ${std_msgs_EXPORTED_TARGETS}) +add_executable(${PROJECT_NAME}-publisher_rate EXCLUDE_FROM_ALL publisher_rate.cpp) +target_link_libraries(${PROJECT_NAME}-publisher_rate ${catkin_LIBRARIES}) +add_dependencies(${PROJECT_NAME}-publisher_rate ${std_msgs_EXPORTED_TARGETS}) + add_executable(${PROJECT_NAME}-subscriber EXCLUDE_FROM_ALL subscriber.cpp) target_link_libraries(${PROJECT_NAME}-subscriber ${catkin_LIBRARIES}) add_dependencies(${PROJECT_NAME}-subscriber ${std_msgs_EXPORTED_TARGETS}) @@ -281,6 +288,7 @@ if(TARGET tests) ${PROJECT_NAME}-publisher ${PROJECT_NAME}-subscriber ${PROJECT_NAME}-stamped_topic_statistics_empty_timestamp + ${PROJECT_NAME}-topic_statistic_frequency ) endif() @@ -346,3 +354,4 @@ add_dependencies(${PROJECT_NAME}-left_right ${${PROJECT_NAME}_EXPORTED_TARGETS}) add_dependencies(${PROJECT_NAME}-string_msg_expect ${${PROJECT_NAME}_EXPORTED_TARGETS}) add_dependencies(${PROJECT_NAME}-stamped_topic_statistics_empty_timestamp ${${PROJECT_NAME}_EXPORTED_TARGETS}) +add_dependencies(${PROJECT_NAME}-topic_statistic_frequency ${${PROJECT_NAME}_EXPORTED_TARGETS}) diff --git a/test/test_roscpp/test/src/publisher_rate.cpp b/test/test_roscpp/test/src/publisher_rate.cpp new file mode 100644 index 0000000000..bada14c66a --- /dev/null +++ b/test/test_roscpp/test/src/publisher_rate.cpp @@ -0,0 +1,38 @@ +// Publish big data chunks +// Author: Max Schwarz + +#include +#include +#include + +#include + +int main(int argc, char** argv) +{ + ros::init(argc, argv, "publisher"); + ros::NodeHandle n; + + const size_t NUM_BYTES = 8; + std_msgs::Int8MultiArray data; + data.data.reserve(NUM_BYTES); + + assert(argc > 1); + float frequency = atof(argv[1]); + + ros::Publisher pub = n.advertise("data", 1); + ros::Rate rate(frequency); + + size_t start = 0; + while(ros::ok()) + { + data.data.clear(); + for(size_t i = 0; i < NUM_BYTES; ++i) + { + data.data.push_back(start + i); + } + pub.publish(data); + rate.sleep(); + start++; + } + return 0; +} diff --git a/test/test_roscpp/test/src/topic_statistic_frequency.cpp b/test/test_roscpp/test/src/topic_statistic_frequency.cpp new file mode 100644 index 0000000000..cff787d8ad --- /dev/null +++ b/test/test_roscpp/test/src/topic_statistic_frequency.cpp @@ -0,0 +1,65 @@ +#include +#include + +#include +#include +#include +#include +#include + +class Aggregator { +public: + std::map topic_period_mean_map_; + + void cb(const rosgraph_msgs::TopicStatistics& msg) { + topic_period_mean_map_[msg.topic] = msg.period_mean; + } + + bool frequencyAcceptable(const std::string& topic, float expected) { + float errorMargin = 0.1; + float foundFreq = 1.f / topic_period_mean_map_[topic].toSec(); + return std::fabs(foundFreq - expected) / expected <= errorMargin; + } +}; + +void assertEventuallyHasTopic(const Aggregator& agg, const std::string& topic) { + ros::Duration timeout(5.0); + auto start = ros::Time::now(); + while (ros::Time::now() - start < timeout && !agg.topic_period_mean_map_.count(topic)) { + ros::Duration(0.5).sleep(); + } + ASSERT_EQ(agg.topic_period_mean_map_.count(topic), 1u); +} + +TEST(TopicStatisticFrequency, statisticFrequency) +{ + ros::NodeHandle nh; + Aggregator agg; + ros::Subscriber stat_sub = nh.subscribe("/statistics", 1, &Aggregator::cb, &agg); + + ros::AsyncSpinner spinner(4); + spinner.start(); + + ros::Duration(5.0).sleep(); + + assertEventuallyHasTopic(agg, "/very_fast_chatter"); + assertEventuallyHasTopic(agg, "/fast_chatter"); + assertEventuallyHasTopic(agg, "/slow_chatter"); + assertEventuallyHasTopic(agg, "/very_slow_chatter"); + + ros::shutdown(); + + ASSERT_TRUE(agg.frequencyAcceptable("/very_fast_chatter", 171)); + ASSERT_TRUE(agg.frequencyAcceptable("/fast_chatter", 53)); + ASSERT_TRUE(agg.frequencyAcceptable("/slow_chatter", 18)); + ASSERT_TRUE(agg.frequencyAcceptable("/very_slow_chatter", 0.8)); +} + + +int main(int argc, char** argv) +{ + testing::InitGoogleTest(&argc, argv); + ros::init(argc, argv, "topic_statistic_frequency"); + ros::NodeHandle nh; + return RUN_ALL_TESTS(); +} diff --git a/test/test_rospy/CMakeLists.txt b/test/test_rospy/CMakeLists.txt index 316d6edd36..4343b5bed1 100644 --- a/test/test_rospy/CMakeLists.txt +++ b/test/test_rospy/CMakeLists.txt @@ -58,4 +58,5 @@ if(CATKIN_ENABLE_TESTING) add_rostest(test/rostest/on_shutdown.test) add_rostest(test/rostest/sub_to_multiple_pubs.test) add_rostest(test/rostest/latch_unsubscribe.test) + add_rostest(test/rostest/statistics.test) endif() diff --git a/test/test_rospy/nodes/freq_talker b/test/test_rospy/nodes/freq_talker new file mode 100755 index 0000000000..ac25d8e89c --- /dev/null +++ b/test/test_rospy/nodes/freq_talker @@ -0,0 +1,58 @@ +#!/usr/bin/env python +# Software License Agreement (BSD License) +# +# Copyright (c) 2008, Willow Garage, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +''' +Copy of talker demo for stats testing purposes. Publishes String to 'chatter' +''' + +import rospy +from std_msgs.msg import String + +NAME = 'talker' + + +def talker(): + pub = rospy.Publisher("chatter", String, queue_size=1) + rospy.init_node(NAME, anonymous=True) + freq = rospy.get_param("~frequency", 10) + rate = rospy.Rate(freq) + + count = 0 + while not rospy.is_shutdown(): + pub.publish(String("hello world {}".format(count))) + count += 1 + rate.sleep() + + +if __name__ == '__main__': + talker() diff --git a/test/test_rospy/test/rostest/statistics.test b/test/test_rospy/test/rostest/statistics.test new file mode 100644 index 0000000000..b28cb04f20 --- /dev/null +++ b/test/test_rospy/test/rostest/statistics.test @@ -0,0 +1,42 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/test/test_rospy/test/rostest/test_topic_statistics.py b/test/test_rospy/test/rostest/test_topic_statistics.py new file mode 100755 index 0000000000..d7642725ed --- /dev/null +++ b/test/test_rospy/test/rostest/test_topic_statistics.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python +# Software License Agreement (BSD License) +# +# Copyright (c) 2008, Willow Garage, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +''' +Integration test for topic statistics +''' + +from __future__ import print_function +import sys +import unittest + +import rospy +import rostest +from rosgraph_msgs.msg import TopicStatistics + +PKG = 'test_rospy' + + +class TestTopicStatistics(unittest.TestCase): + def setUp(self): + self.topic_statistic_msg_map = {} + + def new_msg(self, msg): + self.topic_statistic_msg_map[msg.topic] = msg + + def assert_eventually( + self, cond, timeout=rospy.Duration(5.0), interval=rospy.Duration(0.5) + ): + started = rospy.Time.now() + while rospy.Time.now() - started < timeout: + if cond(): + return True + rospy.sleep(interval) + self.assertTrue(False) + + def frequency_acceptable(self, topic, expected, error_margin=0.1): + ''' return True if topic message's measured frequency + is within some error margin of expected frequency ''' + msg = self.topic_statistic_msg_map[topic] + found_freq = 1.0 / msg.period_mean.to_sec() + rospy.loginfo( + "Testing {}'s found frequency {} against expected {}".format( + topic, found_freq, expected)) + return abs(found_freq - expected) / expected <= error_margin + + def test_frequencies(self): + rospy.Subscriber('/statistics', TopicStatistics, self.new_msg) + + self.assert_eventually( + lambda: '/very_fast_chatter' in self.topic_statistic_msg_map) + self.assert_eventually( + lambda: '/fast_chatter' in self.topic_statistic_msg_map) + self.assert_eventually( + lambda: '/slow_chatter' in self.topic_statistic_msg_map) + self.assert_eventually( + lambda: '/very_slow_chatter' in self.topic_statistic_msg_map) + + self.assertTrue(self.frequency_acceptable('/very_fast_chatter', 150)) + self.assertTrue(self.frequency_acceptable('/fast_chatter', 53)) + self.assertTrue(self.frequency_acceptable('/slow_chatter', 8)) + self.assertTrue(self.frequency_acceptable('/very_slow_chatter', 0.5)) + + +if __name__ == '__main__': + rospy.init_node('test_topic_statistics') + rostest.run(PKG, 'rospy_topic_statistics', TestTopicStatistics, sys.argv)