From 447e3c88ec1cd0622ae78deb699f26e01e2ab5e9 Mon Sep 17 00:00:00 2001 From: Gabriella Conde Date: Sun, 30 Jul 2023 07:13:56 -0400 Subject: [PATCH 1/7] Move docstring into class --- .../utils/mil_tools/nodes/online_bagger.py | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/mil_common/utils/mil_tools/nodes/online_bagger.py b/mil_common/utils/mil_tools/nodes/online_bagger.py index 2580f5cf2..16c5aaa65 100755 --- a/mil_common/utils/mil_tools/nodes/online_bagger.py +++ b/mil_common/utils/mil_tools/nodes/online_bagger.py @@ -19,20 +19,21 @@ ) from tqdm import tqdm -""" -Online Bagger is a node which subscribes to a list of ros topics, -maintaining a buffer of the most recent n seconds. Parts or all of -these buffered topics can be written to a bag file by -sending a new goal to the /online_bagger/bag action server. -When run with the -c flag, instead runs an action client which connects -to online bagger, triggering a bag write and displaying a progress bar -as it writes. - -""" +class OnlineBagger: + """ + Node that maintains a list of bagged information relating to the specified + topics. + Subscribes to a list of ROS topics, and maintains a buffer of the most recent + n seconds. Parts or all of these buffered topics can be written to a bag + file by sending a new goal to the /online_bagger/bag action server. When + run with the -c flag, instead runs an action client which connects to online + bagger, triggering a bag write and displaying a progress bar as it writes. -class OnlineBagger: + Attributes: + BAG_TOPIC (str): The action server that OnlineBagger sends goals to. + """ BAG_TOPIC = "/online_bagger/bag" def __init__(self): From 90d876cffe4dbd659a6e5f5f37153917a9dde330 Mon Sep 17 00:00:00 2001 From: Gabriella Conde Date: Wed, 2 Aug 2023 21:36:49 -0400 Subject: [PATCH 2/7] Add missing args/returns --- .../utils/mil_tools/nodes/online_bagger.py | 105 ++++++++++++++++-- 1 file changed, 97 insertions(+), 8 deletions(-) diff --git a/mil_common/utils/mil_tools/nodes/online_bagger.py b/mil_common/utils/mil_tools/nodes/online_bagger.py index 16c5aaa65..fab68ae82 100755 --- a/mil_common/utils/mil_tools/nodes/online_bagger.py +++ b/mil_common/utils/mil_tools/nodes/online_bagger.py @@ -68,6 +68,15 @@ def get_subscriber_list(self, status): """ Get string of all topics, if their subscribe status matches the input (True / False) Outputs each topics: time_buffer(float in seconds), subscribe_statue(bool), topic(string) + + Args: + status (bool): The subscription status used to search for topics with a matching + subcription status. + + Returns: + sub_list (string): The list of topics that match the desired subscribe status. Each + line in the list contains the buffer time (in seconds) of the topic, the subscrition + status of the topic, and the topic name. """ sub_list = "" for topic in self.subscriber_list: @@ -103,10 +112,23 @@ def get_params(self): self.subscriber_list[topic[0]] = (time, False) def add_unique_topic(topic): + """ + Adds a topic to the subscriber list if the topic is not already in the + list. + + Args: + topic (str): The name of the topic to add to the subscriber list. + """ if topic not in self.subscriber_list: self.subscriber_list[topic] = (self.stream_time, False) def add_env_var(var): + """ + Adds topic(s) to the subscriber list. + + Args: + var (str): The topic(s) to add to the subscriber list. + """ for topic in var.split(): add_unique_topic(topic) @@ -206,11 +228,16 @@ def subscribe(self, time_info=None): where the Bool tells the current status of the subscriber (success/failure). Return number of topics that failed subscription + + Args: + time_info (): An instance of the TimerEvent class. This method does + not use the argument but it is required or an error will occur. The + default value is None. """ if self.successful_subscription_count == len(self.subscriber_list): if self.resubscriber is not None: self.resubscriber.shutdown() - rospy.loginfo("All topics subscribed too! Shutting down resubscriber") + rospy.loginfo("All topics subscribed too! Shutting down resubscriber") for topic, (time, subscribed) in self.subscriber_list.items(): if not subscribed: @@ -227,7 +254,13 @@ def subscribe(self, time_info=None): def get_topic_duration(self, topic): """ - Return current time duration of topic + Returns the current time duration of topic + + Args: + topic (rostopic): The topic for which the duration will be calculated. + + Returns: + duration (Duration): The time duration of the topic. """ return self.topic_messages[topic][-1][0] - self.topic_messages[topic][0][0] @@ -235,8 +268,14 @@ def get_topic_duration(self, topic): def get_header_time(self, msg): """ Retrieve header time if available - """ + Args: + msg (msg): The ROS message from which to extract the time. + + Returns: + msg.header.stamp (stamp): The timestamp of the topic's header if the topic + has a header. Otherwise, the current time is returned. + """ if hasattr(msg, "header"): return msg.header.stamp else: @@ -244,7 +283,8 @@ def get_header_time(self, msg): def get_time_index(self, topic, requested_seconds): """ - Return the index for the time index for a topic at 'n' seconds from the end of the dequeue + Returns the index for the time index for a topic at 'n' seconds from the end of the dequeue. + For example, to bag the last 10 seconds of data, the index for 10 seconds back from the most recent message can be obtained with this function. The number of requested seconds should be the number of seoncds desired from @@ -252,6 +292,15 @@ def get_time_index(self, topic, requested_seconds): If the desired time length of the bag is greater than the available messages it will output a message and return how ever many seconds of data are available at the moment. Seconds is of a number type (not a rospy.Time type) (ie. int, float) + + Args: + topic (str): The name of the topic for which to get the time index. + requested_seconds (int/float): The number of seconds from the end of the dequeue to search + for the topic. + + Returns: + index (int): The index for the time index of the topic at requested_seconds seconds from the + end of the dequeue. """ topic_duration = self.get_topic_duration(topic).to_sec() @@ -265,10 +314,15 @@ def get_time_index(self, topic, requested_seconds): def bagger_callback(self, msg, topic): """ - Streaming callback function, stops streaming during bagging process - also pops off msgs from dequeue if stream size is greater than specified stream_time + Adds incoming messages to the appropriate topic and removes older messages if necessary. - Stream, callback function does nothing if streaming is not active + Streaming callback function, stops streaming during bagging process and pops off msgs + from dequeue if stream size is greater than specified stream_time. Stream, callback + function does nothing if streaming is not active. + + Args: + msg (msg): The incoming message. + topic (topic): The topic to which the incoming message will be added. """ if not self.streaming: @@ -301,6 +355,14 @@ def bagger_callback(self, msg, topic): def get_topic_message_count(self, topic): """ Return number of messages available in a topic + + Args: + topic (str): The name of the topic for which to calculate the number + of messages. + + Returns: + len(self.topic_messages[topic]) (int): The number of messages + available in the specified topic. """ return len(self.topic_messages[topic]) @@ -308,6 +370,10 @@ def get_topic_message_count(self, topic): def get_total_message_count(self): """ Returns total number of messages across all topics + + Returns: + total_message_count (int): The total number of messages available in + all topics. """ total_message_count = 0 @@ -319,6 +385,12 @@ def get_total_message_count(self): return total_message_count def _get_default_filename(self): + """ + Uses the current date and time to create a default bag name. + + Returns: + bag name (str): The default bag name constructed using format date-time. + """ return ( str(datetime.date.today()) + "-" + str(datetime.datetime.now().time())[0:8] ) @@ -327,6 +399,15 @@ def get_bag_name(self, filename=""): """ Create ros bag save directory If no bag name is provided, the current date/time is used as default. + + Args: + filename (str): The save directory for the ros bag. The default value + is an empty string, which will result in the default filename being + used. + + Returns: + os.path.join(bag_dir, bag_name) (str): A string representing the path + of the ros bag file. """ # If directory param is not set, default to $HOME/bags/ default_dir = self.dir @@ -353,9 +434,17 @@ def get_bag_name(self, filename=""): def start_bagging(self, req): """ + Writes collected data to a bag file. + Dump all data in dictionary to bags, temporarily stops streaming during the bagging process, resumes streaming when over. - If bagging is already false because of an active call to this service + If bagging is already false because of an active call to this service. + + Args: + req (msg): The bagging request information. + + Raises: + IOError: A problem occurs when opening or closing the bag file. """ result = BagOnlineResult() if self.streaming is False: From c9bca3f81590296f6e0322333e8f28e0d791824c Mon Sep 17 00:00:00 2001 From: Gabriella Conde Date: Wed, 2 Aug 2023 21:50:13 -0400 Subject: [PATCH 3/7] Add attributes to class --- mil_common/utils/mil_tools/nodes/online_bagger.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/mil_common/utils/mil_tools/nodes/online_bagger.py b/mil_common/utils/mil_tools/nodes/online_bagger.py index fab68ae82..e6a3731c1 100755 --- a/mil_common/utils/mil_tools/nodes/online_bagger.py +++ b/mil_common/utils/mil_tools/nodes/online_bagger.py @@ -32,7 +32,14 @@ class OnlineBagger: bagger, triggering a bag write and displaying a progress bar as it writes. Attributes: - BAG_TOPIC (str): The action server that OnlineBagger sends goals to. + successful_subscription_count (int): The number of successful subcriptions + to topics. + iteration_count (int): The number of iterations. + streaming (bool): Indicates whether the bagger is streaming. + subscriber_list (list[topic]): The list of topics subscribed to the + OnlineBagger. + _action_server (SimpleActionServer): The action server associated with the + OnlineBagger. """ BAG_TOPIC = "/online_bagger/bag" From 8dc02bebd70b70de5777a4f487b8cb8a33baf3ef Mon Sep 17 00:00:00 2001 From: Gabriella Conde Date: Sat, 5 Aug 2023 04:22:21 -0400 Subject: [PATCH 4/7] Fix indentation error --- mil_common/utils/mil_tools/nodes/online_bagger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mil_common/utils/mil_tools/nodes/online_bagger.py b/mil_common/utils/mil_tools/nodes/online_bagger.py index e6a3731c1..41222470c 100755 --- a/mil_common/utils/mil_tools/nodes/online_bagger.py +++ b/mil_common/utils/mil_tools/nodes/online_bagger.py @@ -244,7 +244,7 @@ def subscribe(self, time_info=None): if self.successful_subscription_count == len(self.subscriber_list): if self.resubscriber is not None: self.resubscriber.shutdown() - rospy.loginfo("All topics subscribed too! Shutting down resubscriber") + rospy.loginfo("All topics subscribed too! Shutting down resubscriber") for topic, (time, subscribed) in self.subscriber_list.items(): if not subscribed: From 4c0700faf38f9ad0c51949da52ab960a281edaf2 Mon Sep 17 00:00:00 2001 From: Gabriella Conde Date: Sat, 5 Aug 2023 04:23:24 -0400 Subject: [PATCH 5/7] Add OnlineBagger to docs --- docs/reference/bagging.rst | 9 +++++++++ docs/reference/index.rst | 1 + 2 files changed, 10 insertions(+) create mode 100644 docs/reference/bagging.rst diff --git a/docs/reference/bagging.rst b/docs/reference/bagging.rst new file mode 100644 index 000000000..feb007dad --- /dev/null +++ b/docs/reference/bagging.rst @@ -0,0 +1,9 @@ +Online Bagger +------------- + +.. currentmodule:: mil_tools + +.. attributetable:: nodes.online_bagger.OnlineBagger + +.. autoclass:: nodes.online_bagger.OnlineBagger + :members: diff --git a/docs/reference/index.rst b/docs/reference/index.rst index b6f96f49e..c63581a0c 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -31,3 +31,4 @@ by MIL. These subsystems relate to a variety of processes. poi pneumatic sabertooth + bagging From b706724392513e3feb6412cb8f71b3335d8b0490 Mon Sep 17 00:00:00 2001 From: Gabriella Conde Date: Sat, 5 Aug 2023 04:52:48 -0400 Subject: [PATCH 6/7] Fix nested if statements --- mil_common/utils/mil_tools/nodes/online_bagger.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/mil_common/utils/mil_tools/nodes/online_bagger.py b/mil_common/utils/mil_tools/nodes/online_bagger.py index 41222470c..8b9285a32 100755 --- a/mil_common/utils/mil_tools/nodes/online_bagger.py +++ b/mil_common/utils/mil_tools/nodes/online_bagger.py @@ -41,6 +41,7 @@ class OnlineBagger: _action_server (SimpleActionServer): The action server associated with the OnlineBagger. """ + BAG_TOPIC = "/online_bagger/bag" def __init__(self): @@ -241,9 +242,10 @@ def subscribe(self, time_info=None): not use the argument but it is required or an error will occur. The default value is None. """ - if self.successful_subscription_count == len(self.subscriber_list): - if self.resubscriber is not None: - self.resubscriber.shutdown() + if (self.successful_subscription_count == len(self.subscriber_list)) and ( + self.resubscriber is not None + ): + self.resubscriber.shutdown() rospy.loginfo("All topics subscribed too! Shutting down resubscriber") for topic, (time, subscribed) in self.subscriber_list.items(): @@ -262,12 +264,12 @@ def subscribe(self, time_info=None): def get_topic_duration(self, topic): """ Returns the current time duration of topic - + Args: topic (rostopic): The topic for which the duration will be calculated. Returns: - duration (Duration): The time duration of the topic. + duration (Duration): The time duration of the topic. """ return self.topic_messages[topic][-1][0] - self.topic_messages[topic][0][0] From 2c44a31cc2fa060c6b78960e42e992c00509fb0d Mon Sep 17 00:00:00 2001 From: Cameron Brown Date: Wed, 28 Feb 2024 16:01:43 -0500 Subject: [PATCH 7/7] Slight docs modifications --- .../utils/mil_tools/nodes/online_bagger.py | 65 ++++++++----------- 1 file changed, 28 insertions(+), 37 deletions(-) diff --git a/mil_common/utils/mil_tools/nodes/online_bagger.py b/mil_common/utils/mil_tools/nodes/online_bagger.py index a80e5714a..5055d3a5f 100755 --- a/mil_common/utils/mil_tools/nodes/online_bagger.py +++ b/mil_common/utils/mil_tools/nodes/online_bagger.py @@ -1,11 +1,13 @@ #!/usr/bin/env python3 +from __future__ import annotations import argparse import datetime import itertools import os from collections import deque +from typing import TYPE_CHECKING import rosbag import rospy @@ -19,6 +21,9 @@ ) from tqdm import tqdm +if TYPE_CHECKING: + import genpy + class OnlineBagger: """ @@ -32,13 +37,11 @@ class OnlineBagger: bagger, triggering a bag write and displaying a progress bar as it writes. Attributes: - successful_subscription_count (int): The number of successful subcriptions + successful_subscription_count (int): The number of successful subscriptions to topics. iteration_count (int): The number of iterations. streaming (bool): Indicates whether the bagger is streaming. - subscriber_list (list[topic]): The list of topics subscribed to the - OnlineBagger. - _action_server (SimpleActionServer): The action server associated with the + subscriber_list (list[str]): The list of topics subscribed to the OnlineBagger. """ @@ -79,10 +82,10 @@ def get_subscriber_list(self, status): Args: status (bool): The subscription status used to search for topics with a matching - subcription status. + subscription status. Returns: - sub_list (string): The list of topics that match the desired subscribe status. Each + string: The list of topics that match the desired subscribe status. Each line in the list contains the buffer time (in seconds) of the topic, the subscrition status of the topic, and the topic name. """ @@ -220,7 +223,7 @@ def subscribe_loop(self): self.subscribe, ) - def subscribe(self, time_info=None): + def subscribe(self, _: rospy.timer.TimerEvent | None = None): """ Subscribe to the topics defined in the yaml configuration file @@ -231,13 +234,6 @@ def subscribe(self, time_info=None): Each element in self.subscriber list is a list [topic, Bool] where the Bool tells the current status of the subscriber (success/failure). - - Return number of topics that failed subscription - - Args: - time_info (): An instance of the TimerEvent class. This method does - not use the argument but it is required or an error will occur. The - default value is None. """ if (self.successful_subscription_count == len(self.subscriber_list)) and ( self.resubscriber is not None @@ -258,28 +254,27 @@ def subscribe(self, time_info=None): self.subscriber_list[topic] = (time, True) - def get_topic_duration(self, topic): + def get_topic_duration(self, topic: str): """ Returns the current time duration of topic Args: - topic (rostopic): The topic for which the duration will be calculated. + topic (str): The topic for which the duration will be calculated. Returns: - duration (Duration): The time duration of the topic. + Duration: The time duration of the topic. """ - return self.topic_messages[topic][-1][0] - self.topic_messages[topic][0][0] - def get_header_time(self, msg): + def get_header_time(self, msg: genpy.Message): """ Retrieve header time if available Args: - msg (msg): The ROS message from which to extract the time. + msg (genpy.Message): The ROS message from which to extract the time. Returns: - msg.header.stamp (stamp): The timestamp of the topic's header if the topic + rospy.Time: The timestamp of the topic's header if the topic has a header. Otherwise, the current time is returned. """ if hasattr(msg, "header"): @@ -305,7 +300,7 @@ def get_time_index(self, topic, requested_seconds): for the topic. Returns: - index (int): The index for the time index of the topic at requested_seconds seconds from the + int: The index for the time index of the topic at requested_seconds seconds from the end of the dequeue. """ @@ -318,7 +313,7 @@ def get_time_index(self, topic, requested_seconds): index = int(self.get_topic_message_count(topic) * (1 - min(ratio, 1))) return index - def bagger_callback(self, msg, topic): + def bagger_callback(self, msg: genpy.Message, topic: str): """ Adds incoming messages to the appropriate topic and removes older messages if necessary. @@ -327,8 +322,8 @@ def bagger_callback(self, msg, topic): function does nothing if streaming is not active. Args: - msg (msg): The incoming message. - topic (topic): The topic to which the incoming message will be added. + msg (genpy.Message): The incoming message. + topic (str): The topic to which the incoming message will be added. """ if not self.streaming: @@ -358,7 +353,7 @@ def bagger_callback(self, msg, topic): self.topic_messages[topic].popleft() time_diff = self.get_topic_duration(topic) - def get_topic_message_count(self, topic): + def get_topic_message_count(self, topic: str): """ Return number of messages available in a topic @@ -367,8 +362,7 @@ def get_topic_message_count(self, topic): of messages. Returns: - len(self.topic_messages[topic]) (int): The number of messages - available in the specified topic. + int: The number of messages available in the specified topic. """ return len(self.topic_messages[topic]) @@ -378,10 +372,8 @@ def get_total_message_count(self): Returns total number of messages across all topics Returns: - total_message_count (int): The total number of messages available in - all topics. + int: The total number of messages available in all topics. """ - total_message_count = 0 for topic in self.topic_messages: total_message_count = total_message_count + self.get_topic_message_count( @@ -395,13 +387,13 @@ def _get_default_filename(self): Uses the current date and time to create a default bag name. Returns: - bag name (str): The default bag name constructed using format date-time. + str: The default bag name constructed using format date-time. """ return ( str(datetime.date.today()) + "-" + str(datetime.datetime.now().time())[0:8] ) - def get_bag_name(self, filename=""): + def get_bag_name(self, filename: str = ""): """ Create ros bag save directory If no bag name is provided, the current date/time is used as default. @@ -412,8 +404,7 @@ def get_bag_name(self, filename=""): used. Returns: - os.path.join(bag_dir, bag_name) (str): A string representing the path - of the ros bag file. + str: A string representing the path of the ros bag file. """ # If directory param is not set, default to $HOME/bags/ default_dir = self.dir @@ -438,7 +429,7 @@ def get_bag_name(self, filename=""): bag_name = bag_name + ".bag" return os.path.join(bag_dir, bag_name) - def start_bagging(self, req): + def start_bagging(self, req: BagOnlineGoal): """ Writes collected data to a bag file. @@ -447,7 +438,7 @@ def start_bagging(self, req): If bagging is already false because of an active call to this service. Args: - req (msg): The bagging request information. + req (BagOnlineGoal): The bagging request information. Raises: IOError: A problem occurs when opening or closing the bag file.