Skip to content

Commit

Permalink
remove messages that are newer than the newly added message (ros#1438)
Browse files Browse the repository at this point in the history
* remove messages that are newer than the newly added message

* use /clock as reference for detecting backward jumps

* set time to initialised after constructing ApproximateTimeSynchronizer

* notify user about clearing buffer

* use continuous time for testing header-less message synchronisation

* test buffer clearing when jumping back in time

* avoid unrelated changes, remove double space
  • Loading branch information
christian-rauch authored and tahsinkose committed Apr 15, 2019
1 parent 9c47287 commit 6a5d97a
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 10 deletions.
10 changes: 10 additions & 0 deletions utilities/message_filters/src/message_filters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ def __init__(self, fs, queue_size, slop, allow_headerless=False):
TimeSynchronizer.__init__(self, fs, queue_size)
self.slop = rospy.Duration.from_sec(slop)
self.allow_headerless = allow_headerless
self.last_added = rospy.Time()

def add(self, msg, my_queue, my_queue_index=None):
if not hasattr(msg, 'header') or not hasattr(msg.header, 'stamp'):
Expand All @@ -257,6 +258,15 @@ def add(self, msg, my_queue, my_queue_index=None):

self.lock.acquire()
my_queue[stamp] = msg

# clear all buffers if jump backwards in time is detected
now = rospy.Time.now()
if now < self.last_added:
rospy.loginfo("ApproximateTimeSynchronizer: Detected jump back in time. Clearing buffer.")
for q in self.queues:
q.clear()
self.last_added = now

while len(my_queue) > self.queue_size:
del my_queue[min(my_queue)]
# self.queues = [topic_0 {stamp: msg}, topic_1 {stamp: msg}, ...]
Expand Down
51 changes: 41 additions & 10 deletions utilities/message_filters/test/test_approxsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import rospy
import unittest
import random
import copy

import message_filters
from message_filters import ApproximateTimeSynchronizer
Expand Down Expand Up @@ -65,6 +66,7 @@ def test_approx(self):
m0 = MockFilter()
m1 = MockFilter()
ts = ApproximateTimeSynchronizer([m0, m1], 1, 0.1)
rospy.rostime.set_rostime_initialized(True)
ts.registerCallback(self.cb_collector_2msg)

if 0:
Expand Down Expand Up @@ -98,25 +100,54 @@ def test_approx(self):

# Scramble sequences of length N of headerless and header-having messages.
# Make sure that TimeSequencer recombines them.
rospy.rostime.set_rostime_initialized(True)
random.seed(0)
for N in range(1, 10):
m0 = MockFilter()
m1 = MockFilter()
seq0 = [MockMessage(rospy.Time(t), random.random()) for t in range(N)]
seq1 = [MockHeaderlessMessage(random.random()) for t in range(N)]
# random.shuffle(seq0)
# new shuffled sequences
seq0r = copy.copy(seq0)
seq1r = copy.copy(seq1)
random.shuffle(seq0r)
random.shuffle(seq1r)
ts = ApproximateTimeSynchronizer([m0, m1], N, 0.1, allow_headerless=True)
ts.registerCallback(self.cb_collector_2msg)
self.collector = []
for msg in random.sample(seq0, N):
m0.signalMessage(msg)
self.assertEqual(self.collector, [])
for i in random.sample(range(N), N):
msg = seq1[i]
rospy.rostime._set_rostime(rospy.Time(i+0.05))
m1.signalMessage(msg)
self.assertEqual(set(self.collector), set(zip(seq0, seq1)))
for i in range(N):
# clock time needs to be continuous
rospy.rostime._set_rostime(rospy.Time(i))
m0.signalMessage(seq0r[i])
m1.signalMessage(seq1r[i])
self.assertEqual(set(self.collector), set(zip(seq0, seq1r)))

# clear buffer on sequences with non-continuous time
random.seed(0)
for N in range(2, 10):
m0 = MockFilter()
m1 = MockFilter()
seq0 = [MockMessage(rospy.Time(t), random.random()) for t in range(N)]
seq1 = [MockMessage(rospy.Time(t), random.random()) for t in range(N)]
ts = ApproximateTimeSynchronizer([m0, m1], N, 0.1)
ts.registerCallback(self.cb_collector_2msg)
self.collector = []
seq_time = [rospy.Time(i) for i in range(10, 10+N)]
# select a random time in sequence at which time jumps backwards
ind_rand = random.randint(1, N-1)
random_jump = random.uniform(1/1000.0, 100/1000.0)
# jump backward in time by 'random_jump', starting at 'ind_rand'
for i in range(N-1, ind_rand-1, -1):
seq_time[i] = seq_time[i-1]-rospy.Duration(random_jump)
for i in range(N):
rospy.rostime._set_rostime(seq_time[i])
m0.signalMessage(seq0[i])
if i==ind_rand:
# expect buffer reset
assert(len(ts.queues[0])==0 and len(ts.queues[1])==0)
m1.signalMessage(seq1[i])
# expect N synchronisation minus 1 buffer reset
assert(len(self.collector)==(N-1))


if __name__ == '__main__':
if 1:
Expand Down

0 comments on commit 6a5d97a

Please sign in to comment.