Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1577 revisited: Fix dynamic windowing for Topic Statistics #1695

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions clients/roscpp/include/ros/statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,13 @@ class ROSCPP_DECL StatisticsLogger

private:

// these are hard constrains
// Range of window length, in seconds
int max_window;
int min_window;

// these are soft constrains
// Range of acceptable messages in window.
// Window size will be adjusted if number of observed is
// outside this range.
int max_elements;
int min_elements;

Expand Down
7 changes: 4 additions & 3 deletions clients/roscpp/src/libros/statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ void StatisticsLogger::callback(const boost::shared_ptr<M_string>& connection_he
}

// should publish new statistics?
if (stats.last_publish + ros::Duration(1 / pub_frequency_) < received_time)
double pub_period = 1.0 / pub_frequency_;
if (stats.last_publish + ros::Duration(pub_period) < received_time)
{
ros::Time window_start = stats.last_publish;
stats.last_publish = received_time;
Expand Down Expand Up @@ -236,11 +237,11 @@ void StatisticsLogger::callback(const boost::shared_ptr<M_string>& connection_he
pub_.publish(msg);

// dynamic window resizing
if (stats.arrival_time_list.size() > static_cast<size_t>(max_elements) && pub_frequency_ * 2 <= max_window)
if (stats.arrival_time_list.size() > static_cast<size_t>(max_elements) && pub_period / 2.0 >= min_window)
{
pub_frequency_ *= 2;
}
if (stats.arrival_time_list.size() < static_cast<size_t>(min_elements) && pub_frequency_ / 2 >= min_window)
if (stats.arrival_time_list.size() < static_cast<size_t>(min_elements) && pub_period * 2.0 <= max_window)
{
pub_frequency_ /= 2;
}
Expand Down
17 changes: 9 additions & 8 deletions clients/rospy/src/rospy/impl/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ def read_parameters(self):
Fetch window parameters from parameter server
"""

# Range of window length, in seconds
self.min_elements = rospy.get_param("/statistics_window_min_elements", 10)
self.max_elements = rospy.get_param("/statistics_window_max_elements", 100)

# Range of acceptable messages in window.
# Window size will be adjusted if number of observed is
# outside this range.
self.max_window = rospy.get_param("/statistics_window_max_size", 64)
self.min_elements = rospy.get_param("/statistics_window_min_elements", 10)
self.max_elements = rospy.get_param("/statistics_window_max_elements", 100)

# Range of window length, in seconds
self.min_window = rospy.get_param("/statistics_window_min_size", 4)
self.max_window = rospy.get_param("/statistics_window_max_size", 64)

def callback(self, msg, publisher, stat_bytes):
"""
Expand Down Expand Up @@ -208,9 +208,10 @@ def sendStatistics(self, subscriber_statistics_logger):
self.pub.publish(msg)

# adjust window, if message count is not appropriate.
if len(self.arrival_time_list_) > subscriber_statistics_logger.max_elements and self.pub_frequency.to_sec() * 2 <= subscriber_statistics_logger.max_window:
pub_period = 1.0 / self.pub_frequency.to_sec()
if len(self.arrival_time_list_) > subscriber_statistics_logger.max_elements and pub_period / 2 >= subscriber_statistics_logger.min_window:
self.pub_frequency *= 2
if len(self.arrival_time_list_) < subscriber_statistics_logger.min_elements and self.pub_frequency.to_sec() / 2 >= subscriber_statistics_logger.min_window:
if len(self.arrival_time_list_) < subscriber_statistics_logger.min_elements and pub_period * 2 <= subscriber_statistics_logger.max_window:
self.pub_frequency /= 2

# clear collected stats, start new window.
Expand Down Expand Up @@ -257,7 +258,7 @@ def callback(self, subscriber_statistics_logger, msg, stat_bytes):
self.last_seq_ = msg.header.seq

# send out statistics with a certain frequency
if self.last_pub_time + self.pub_frequency < arrival_time:
if self.last_pub_time + rospy.Duration(1.0 / self.pub_frequency.to_sec()) < arrival_time:
self.last_pub_time = arrival_time
self.sendStatistics(subscriber_statistics_logger)

Expand Down
1 change: 1 addition & 0 deletions test/test_roscpp/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ add_rostest(launch/ns_node_remapping.xml)
add_rostest(launch/search_param.xml)

add_rostest(launch/stamped_topic_statistics_with_empty_timestamp.xml)
add_rostest(launch/topic_statistic_frequency.xml DEPENDENCIES ${PROJECT_NAME}-publisher_rate ${PROJECT_NAME}-subscriber ${PROJECT_NAME}-topic_statistic_frequency)

# Test spinners
add_rostest(launch/spinners.xml)
39 changes: 39 additions & 0 deletions test/test_roscpp/test/launch/topic_statistic_frequency.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<!-- basic smoke test for TopicStatistics -->
<launch>
<param name="/enable_statistics" value="true" />
<!-- default 10 would take 5s to warm up for very slow talker -->
<param name="/statistics_window_min_elements" value="4" />

<!-- under 1Hz important, since checking window starts there -->
<node name="very_slow_talker" pkg="test_roscpp" type="test_roscpp-publisher_rate" required="true" args="0.8">
<remap from="data" to="very_slow_chatter" />
</node>
<node name="very_slow_listener" pkg="test_roscpp" type="test_roscpp-subscriber" required="true">
<remap from="data" to="very_slow_chatter" />
</node>

<!-- publishing within fairly normal range of frequencies -->
<node name="slow_talker" pkg="test_roscpp" type="test_roscpp-publisher_rate" required="true" args="18">
<remap from="data" to="slow_chatter" />
</node>
<node name="slow_listener" pkg="test_roscpp" type="test_roscpp-subscriber" required="true">
<remap from="data" to="slow_chatter" />
</node>

<node name="fast_talker" pkg="test_roscpp" type="test_roscpp-publisher_rate" required="true" args="53">
<remap from="data" to="fast_chatter" />
</node>
<node name="fast_listener" pkg="test_roscpp" type="test_roscpp-subscriber" required="true">
<remap from="data" to="fast_chatter" />
</node>

<!-- fast outlier (for most ros systems) -->
<node name="very_fast_talker" pkg="test_roscpp" type="test_roscpp-publisher_rate" required="true" args="171">
<remap from="data" to="/very_fast_chatter" />
</node>
<node name="very_fast_listener" pkg="test_roscpp" type="test_roscpp-subscriber" required="true">
<remap from="data" to="/very_fast_chatter" />
</node>

<test test-name="roscpp_topic_statistics" pkg="test_roscpp" type="test_roscpp-topic_statistic_frequency" />
</launch>
11 changes: 10 additions & 1 deletion test/test_roscpp/test/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,20 @@ add_dependencies(${PROJECT_NAME}-string_msg_expect ${std_msgs_EXPORTED_TARGETS})
add_executable(${PROJECT_NAME}-stamped_topic_statistics_empty_timestamp EXCLUDE_FROM_ALL stamped_topic_statistics_empty_timestamp.cpp)
target_link_libraries(${PROJECT_NAME}-stamped_topic_statistics_empty_timestamp ${GTEST_LIBRARIES} ${catkin_LIBRARIES})

# The publisher and subscriber are compiled but not registered as a unit test
add_executable(${PROJECT_NAME}-topic_statistic_frequency EXCLUDE_FROM_ALL topic_statistic_frequency.cpp)
target_link_libraries(${PROJECT_NAME}-topic_statistic_frequency ${GTEST_LIBRARIES} ${catkin_LIBRARIES})

# The publishers and subscriber are compiled but not registered as a unit test
# since the test execution requires a network connection which drops packages.
# Call scripts/test_udp_with_dropped_packets.sh to run the test.
add_executable(${PROJECT_NAME}-publisher EXCLUDE_FROM_ALL publisher.cpp)
target_link_libraries(${PROJECT_NAME}-publisher ${catkin_LIBRARIES})
add_dependencies(${PROJECT_NAME}-publisher ${std_msgs_EXPORTED_TARGETS})

add_executable(${PROJECT_NAME}-publisher_rate EXCLUDE_FROM_ALL publisher_rate.cpp)
target_link_libraries(${PROJECT_NAME}-publisher_rate ${catkin_LIBRARIES})
add_dependencies(${PROJECT_NAME}-publisher_rate ${std_msgs_EXPORTED_TARGETS})

add_executable(${PROJECT_NAME}-subscriber EXCLUDE_FROM_ALL subscriber.cpp)
target_link_libraries(${PROJECT_NAME}-subscriber ${catkin_LIBRARIES})
add_dependencies(${PROJECT_NAME}-subscriber ${std_msgs_EXPORTED_TARGETS})
Expand Down Expand Up @@ -281,6 +288,7 @@ if(TARGET tests)
${PROJECT_NAME}-publisher
${PROJECT_NAME}-subscriber
${PROJECT_NAME}-stamped_topic_statistics_empty_timestamp
${PROJECT_NAME}-topic_statistic_frequency
)
endif()

Expand Down Expand Up @@ -346,3 +354,4 @@ add_dependencies(${PROJECT_NAME}-left_right ${${PROJECT_NAME}_EXPORTED_TARGETS})
add_dependencies(${PROJECT_NAME}-string_msg_expect ${${PROJECT_NAME}_EXPORTED_TARGETS})
add_dependencies(${PROJECT_NAME}-stamped_topic_statistics_empty_timestamp
${${PROJECT_NAME}_EXPORTED_TARGETS})
add_dependencies(${PROJECT_NAME}-topic_statistic_frequency ${${PROJECT_NAME}_EXPORTED_TARGETS})
38 changes: 38 additions & 0 deletions test/test_roscpp/test/src/publisher_rate.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Publish big data chunks
// Author: Max Schwarz <max.schwarz@uni-bonn.de>

#include <ros/publisher.h>
#include <ros/init.h>
#include <ros/node_handle.h>

#include <std_msgs/Int8MultiArray.h>

int main(int argc, char** argv)
{
ros::init(argc, argv, "publisher");
ros::NodeHandle n;

const size_t NUM_BYTES = 8;
std_msgs::Int8MultiArray data;
data.data.reserve(NUM_BYTES);

assert(argc > 1);
float frequency = atof(argv[1]);

ros::Publisher pub = n.advertise<std_msgs::Int8MultiArray>("data", 1);
ros::Rate rate(frequency);

size_t start = 0;
while(ros::ok())
{
data.data.clear();
for(size_t i = 0; i < NUM_BYTES; ++i)
{
data.data.push_back(start + i);
}
pub.publish(data);
rate.sleep();
start++;
}
return 0;
}
65 changes: 65 additions & 0 deletions test/test_roscpp/test/src/topic_statistic_frequency.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#include <map>
#include <string>

#include <ros/ros.h>
#include <gtest/gtest.h>
#include <rosgraph_msgs/TopicStatistics.h>
#include <boost/thread.hpp>
#include <std_msgs/Int8MultiArray.h>

class Aggregator {
public:
std::map<std::string, ros::Duration> topic_period_mean_map_;

void cb(const rosgraph_msgs::TopicStatistics& msg) {
topic_period_mean_map_[msg.topic] = msg.period_mean;
}

bool frequencyAcceptable(const std::string& topic, float expected) {
float errorMargin = 0.1;
float foundFreq = 1.f / topic_period_mean_map_[topic].toSec();
return std::fabs(foundFreq - expected) / expected <= errorMargin;
}
};

void assertEventuallyHasTopic(const Aggregator& agg, const std::string& topic) {
ros::Duration timeout(5.0);
auto start = ros::Time::now();
while (ros::Time::now() - start < timeout && !agg.topic_period_mean_map_.count(topic)) {
ros::Duration(0.5).sleep();
}
ASSERT_EQ(agg.topic_period_mean_map_.count(topic), 1u);
}

TEST(TopicStatisticFrequency, statisticFrequency)
{
ros::NodeHandle nh;
Aggregator agg;
ros::Subscriber stat_sub = nh.subscribe("/statistics", 1, &Aggregator::cb, &agg);

ros::AsyncSpinner spinner(4);
spinner.start();

ros::Duration(5.0).sleep();

assertEventuallyHasTopic(agg, "/very_fast_chatter");
assertEventuallyHasTopic(agg, "/fast_chatter");
assertEventuallyHasTopic(agg, "/slow_chatter");
assertEventuallyHasTopic(agg, "/very_slow_chatter");

ros::shutdown();

ASSERT_TRUE(agg.frequencyAcceptable("/very_fast_chatter", 171));
ASSERT_TRUE(agg.frequencyAcceptable("/fast_chatter", 53));
ASSERT_TRUE(agg.frequencyAcceptable("/slow_chatter", 18));
ASSERT_TRUE(agg.frequencyAcceptable("/very_slow_chatter", 0.8));
}


int main(int argc, char** argv)
{
testing::InitGoogleTest(&argc, argv);
ros::init(argc, argv, "topic_statistic_frequency");
ros::NodeHandle nh;
return RUN_ALL_TESTS();
}
1 change: 1 addition & 0 deletions test/test_rospy/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,5 @@ if(CATKIN_ENABLE_TESTING)
add_rostest(test/rostest/on_shutdown.test)
add_rostest(test/rostest/sub_to_multiple_pubs.test)
add_rostest(test/rostest/latch_unsubscribe.test)
add_rostest(test/rostest/statistics.test)
endif()
58 changes: 58 additions & 0 deletions test/test_rospy/nodes/freq_talker
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

'''
Copy of talker demo for stats testing purposes. Publishes String to 'chatter'
'''

import rospy
from std_msgs.msg import String

NAME = 'talker'


def talker():
pub = rospy.Publisher("chatter", String, queue_size=1)
rospy.init_node(NAME, anonymous=True)
freq = rospy.get_param("~frequency", 10)
rate = rospy.Rate(freq)

count = 0
while not rospy.is_shutdown():
pub.publish(String("hello world {}".format(count)))
count += 1
rate.sleep()


if __name__ == '__main__':
talker()
42 changes: 42 additions & 0 deletions test/test_rospy/test/rostest/statistics.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<!-- basic smoke test for TopicStatistics -->
<launch>
<param name="/enable_statistics" value="true" />
<!-- default 10 would take 5s to warm up for very slow talker -->
<param name="/statistics_window_min_elements" value="5" />

<!-- under 1Hz important, since checking window starts there -->
<node name="very_slow_talker" pkg="test_rospy" type="freq_talker" required="true">
<param name="frequency" value="0.5" />
<remap from="chatter" to="very_slow_chatter" />
</node>
<node name="very_slow_listener" pkg="test_rospy" type="listener.py" required="true">
<remap from="chatter" to="very_slow_chatter" />
</node>
<!-- publishing within fairly normal range of frequencies -->
<node name="slow_talker" pkg="test_rospy" type="freq_talker" required="true">
<param name="frequency" value="8" />
<remap from="chatter" to="slow_chatter" />
</node>
<node name="slow_listener" pkg="test_rospy" type="listener.py" required="true">
<remap from="chatter" to="slow_chatter" />
</node>

<node name="fast_talker" pkg="test_rospy" type="freq_talker" required="true">
<param name="frequency" value="53" />
<remap from="chatter" to="fast_chatter" />
</node>
<node name="fast_listener" pkg="test_rospy" type="listener.py" required="true">
<remap from="chatter" to="fast_chatter" />
</node>

<!-- fast outlier (for most ros systems) -->
<node name="very_fast_talker" pkg="test_rospy" type="freq_talker" required="true">
<param name="frequency" value="150" />
<remap from="chatter" to="very_fast_chatter" />
</node>
<node name="very_fast_listener" pkg="test_rospy" type="listener.py" required="true">
<remap from="chatter" to="very_fast_chatter" />
</node>

<test test-name="rospy_topic_statistics" pkg="test_rospy" type="test_topic_statistics.py" />
</launch>
Loading