-
Notifications
You must be signed in to change notification settings - Fork 81
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1214 from wkentaro/refactor-test-block
[jsk_topic_tools] Refactor test codes as good examples
- Loading branch information
Showing
5 changed files
with
82 additions
and
67 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,60 +1,62 @@ | ||
#!/usr/bin/env python | ||
|
||
import os | ||
import sys | ||
|
||
import unittest | ||
|
||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", | ||
"scripts"))) | ||
|
||
import rospy | ||
|
||
try: | ||
from std_msgs.msg import String | ||
except: | ||
import roslib; roslib.load_manifest("jsk_topic_tools") | ||
import roslib | ||
roslib.load_manifest("jsk_topic_tools") | ||
from std_msgs.msg import String | ||
|
||
|
||
PKG = 'jsk_topic_tools' | ||
NAME = 'test_block' | ||
|
||
|
||
class TestBlock(unittest.TestCase): | ||
input_msg = None | ||
subscriber = None | ||
def reset_subscribers(self): | ||
global running | ||
|
||
def __init__(self, *args): | ||
super(TestBlock, self).__init__(*args) | ||
rospy.init_node(NAME) | ||
self.output_original_pub = rospy.Publisher( | ||
'output_original', String, queue_size=1) | ||
self.input_original_sub = rospy.Subscriber( | ||
'input_original', String, self.original_topic_cb) | ||
|
||
def original_topic_cb(self, msg): | ||
self.running = True | ||
self.output_original_pub.publish(msg) | ||
|
||
def reset_subscriber(self): | ||
if self.subscriber: | ||
self.subscriber.unregister() | ||
seff.subscriber = None | ||
self._input_msg = None | ||
running = False | ||
self.subscriber = None | ||
self.input_msg = None | ||
self.running = False | ||
|
||
def test_subscribe(self): | ||
self.reset_subscriber() | ||
self.subscriber = rospy.Subscriber("output", String, self.cb) | ||
rospy.loginfo("wait 10 seconds...") | ||
rospy.sleep(10) | ||
self.assertFalse(self.input_msg is None) | ||
|
||
def cb(self, msg): | ||
self.input_msg = msg | ||
def test_no_(self): | ||
global running | ||
|
||
def test_no_subscribe(self): | ||
self.reset_subscriber() | ||
# do not subscribe | ||
self.reset_subscribers() | ||
rospy.loginfo("wait 10 seconds...") | ||
rospy.sleep(10) | ||
self.assertTrue(self.input_msg == None) | ||
def test_sub(self): | ||
global running | ||
self.reset_subscribers() | ||
self.subscribe = rospy.Subscriber("/output", String, self.cb) | ||
rospy.loginfo("wait 10 seconds...") | ||
rospy.sleep(10) | ||
self.assertFalse(self.input_msg == None) | ||
self.assertTrue(self.input_msg is None) | ||
|
||
running = False | ||
output_original_pub = None | ||
def topic_cb(msg): | ||
global running | ||
running = True | ||
output_original_pub.publish(msg) | ||
|
||
if __name__ == "__main__": | ||
import rostest | ||
rospy.init_node("test_blocke") | ||
output_original_pub = rospy.Publisher("/output_original", String) | ||
s = rospy.Subscriber("/input_original", String, topic_cb) | ||
rostest.rosrun("jsk_topic_tools", "test_block", TestBlock) | ||
|
||
rostest.rosrun(PKG, NAME, TestBlock) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,54 +1,55 @@ | ||
#!/usr/bin/env python | ||
import os | ||
import sys | ||
# -*- coding: utf-8 -*- | ||
|
||
import unittest | ||
|
||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", | ||
"scripts"))) | ||
from topic_compare import ROSTopicCompare | ||
|
||
import rospy | ||
import time | ||
|
||
import numpy as np | ||
|
||
import rospy | ||
try: | ||
from std_msgs.msg import Float32 | ||
except: | ||
import roslib; roslib.load_manifest("jsk_topic_tools") | ||
import roslib | ||
roslib.load_manifest("jsk_topic_tools") | ||
from std_msgs.msg import Float32 | ||
|
||
|
||
PKG = 'jsk_topic_tools' | ||
NAME = 'test_hz_measure' | ||
|
||
|
||
def eps_equal(a, b, err=0.001): | ||
return abs(a - b) < err | ||
|
||
hz_msg = None | ||
|
||
def topic_cb(msg): | ||
global hz_msg | ||
hz_msg = msg | ||
class TestHzMeasure(unittest.TestCase): | ||
|
||
def __init__(self, *args): | ||
super(TestHzMeasure, self).__init__(*args) | ||
self.hz_msg = None | ||
rospy.init_node(NAME) | ||
rospy.Subscriber('/hz/output', Float32, self.topic_cb) | ||
|
||
def topic_cb(self, msg): | ||
self.hz_msg = msg | ||
|
||
class TestHzMeasure(unittest.TestCase): | ||
def test_hz(self): | ||
global hz_msg | ||
while hz_msg == None: | ||
while self.hz_msg is None: | ||
rospy.loginfo('wait for msg...') | ||
if not rospy.is_shutdown(): | ||
rospy.sleep(5.0) #wait for 5 sec | ||
rospy.sleep(5.0) # wait for 5 sec | ||
# should be 30Hz | ||
msgs = [] | ||
while len(msgs) < 30: | ||
msgs.append(hz_msg.data) | ||
msgs.append(self.hz_msg.data) | ||
# rospy.loginfo('hz of msg %s'%hz_msg.data) | ||
rospy.sleep(0.1) | ||
hz = np.median(msgs) | ||
rospy.loginfo('average hz of msg %s'%np.mean(msgs)) | ||
rospy.loginfo('median hz of msg %s'%hz) | ||
rospy.loginfo('average hz of msg %s' % np.mean(msgs)) | ||
rospy.loginfo('median hz of msg %s' % hz) | ||
self.assertTrue(eps_equal(hz, 30, 1)) | ||
|
||
|
||
if __name__ == "__main__": | ||
import rostest | ||
rospy.init_node("test_hz_measure") | ||
s = rospy.Subscriber("/hz/output", Float32, topic_cb) | ||
rostest.rosrun("jsk_topic_tools", "test_hz_measure", TestHzMeasure) | ||
rostest.rosrun(PKG, NAME, TestHzMeasure) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters