From 1ecdfd7ca35f7c6847ef3c8044b6e70bc75ebff3 Mon Sep 17 00:00:00 2001 From: Nils Weiss Date: Wed, 10 Feb 2021 12:09:45 +0100 Subject: [PATCH] Use PriorityQueue internally in PythonCANSockets (#3061) * This PR adds a priority queue on python-can sockets to ensure the right order of packets. Incorrect order of CAN packets in PythonCANSockets are a long existing bug in Scapy and probably the reason for most instability in Unit Tests. This PR consists of the following changes: * Use PriorityQueue in python_can Socket Multiplexers * Introduce prio counter to not have message inversion on identical time stamps * enable isotp.uts on non root CI systems to check stability of change * add additional unit test * Enable unstable isotpscan tests to see if this PR has an effect on them * Validate if PriorityQueue is causing the stability I'm seeing in the tests * Validate again. This time remove prio code which shouldn't had any effect * Revert "Validate again. This time remove prio code which shouldn't had any effect" This reverts commit bd1d868d0277b7ae43a785c23ebd6e27b3d1b753. * Revert "Validate if PriorityQueue is causing the stability I'm seeing in the tests" This reverts commit a25c579d0d6b6d99b53a3670ca8d0d8a07f13fbf. * disable some long tests * fix rebase bug * minor addition to __lt__ operator * Add a comment why priority is necessary * fix unit test --- scapy/contrib/cansocket_python_can.py | 67 ++++++++++++++++++++++----- scapy/contrib/isotp.py | 2 + test/contrib/isotp.uts | 15 ++++++ test/contrib/isotpscan.uts | 14 +++--- 4 files changed, 79 insertions(+), 19 deletions(-) diff --git a/scapy/contrib/cansocket_python_can.py b/scapy/contrib/cansocket_python_can.py index a82d526b451..456091c818a 100644 --- a/scapy/contrib/cansocket_python_can.py +++ b/scapy/contrib/cansocket_python_can.py @@ -13,7 +13,6 @@ import time import struct import threading -import copy from functools import reduce from operator import add @@ -23,26 +22,70 @@ from scapy.layers.can import CAN from scapy.error import warning from scapy.modules.six.moves import queue +from scapy.compat import Any, List from can import Message as can_Message from can import CanError as can_CanError from can import BusABC as can_BusABC from can.interface import Bus as can_Bus +class PriotizedCanMessage(object): + """Helper object for comparison of CAN messages. If the timestamps of two + messages are equal, the counter value of a priority counter, is used + for comparison. It's only important that this priority counter always + get increased for every CAN message in the receive heapq. This compensates + a low resolution of `time.time()` on some operating systems. + """ + def __init__(self, msg, count): + # type: (can_Message, int) -> None + self.msg = msg + self.count = count + + def __eq__(self, other): + # type: (Any) -> bool + if not isinstance(other, PriotizedCanMessage): + return False + return self.msg.timestamp == other.msg.timestamp and \ + self.count == other.count + + def __lt__(self, other): + # type: (Any) -> bool + if not isinstance(other, PriotizedCanMessage): + return False + return self.msg.timestamp < other.msg.timestamp or \ + (self.msg.timestamp == other.msg.timestamp and + self.count < other.count) + + def __le__(self, other): + # type: (Any) -> bool + return self == other or self < other + + def __gt__(self, other): + # type: (Any) -> bool + return not self <= other + + def __ge__(self, other): + # type: (Any) -> bool + return not self < other + + class SocketMapper: def __init__(self, bus, sockets): - self.bus = bus # type: can_BusABC - self.sockets = sockets # type: list[SocketWrapper] + # type: (can_BusABC, List[SocketWrapper]) -> None + self.bus = bus + self.sockets = sockets def mux(self): while True: + prio_count = 0 try: msg = self.bus.recv(timeout=0) if msg is None: return for sock in self.sockets: if sock._matches_filters(msg): - sock.rx_queue.put(copy.copy(msg)) + prio_count += 1 + sock.rx_queue.put(PriotizedCanMessage(msg, prio_count)) except Exception as e: warning("[MUX] python-can exception caught: %s" % e) @@ -57,7 +100,7 @@ def __new__(cls): SocketsPool.__instance.pool_mutex = threading.Lock() return SocketsPool.__instance - def internal_send(self, sender, msg): + def internal_send(self, sender, msg, prio=0): with self.pool_mutex: try: mapper = self.pool[sender.name] @@ -68,9 +111,7 @@ def internal_send(self, sender, msg): if not sock._matches_filters(msg): continue - m = copy.copy(msg) - m.timestamp = time.time() - sock.rx_queue.put(m) + sock.rx_queue.put(PriotizedCanMessage(msg, prio)) except KeyError: warning("[SND] Socket %s not found in pool" % sender.name) except can_CanError as e: @@ -118,19 +159,22 @@ class SocketWrapper(can_BusABC): def __init__(self, *args, **kwargs): super(SocketWrapper, self).__init__(*args, **kwargs) - self.rx_queue = queue.Queue() # type: queue.Queue[can_Message] + self.rx_queue = queue.PriorityQueue() # type: queue.PriorityQueue[PriotizedCanMessage] # noqa: E501 self.name = None + self.prio_counter = 0 SocketsPool().register(self, *args, **kwargs) def _recv_internal(self, timeout): SocketsPool().multiplex_rx_packets() try: - return self.rx_queue.get(block=True, timeout=timeout), True + pm = self.rx_queue.get(block=True, timeout=timeout) + return pm.msg, True except queue.Empty: return None, True def send(self, msg, timeout=None): - SocketsPool().internal_send(self, msg) + self.prio_counter += 1 + SocketsPool().internal_send(self, msg, self.prio_counter) def shutdown(self): SocketsPool().unregister(self) @@ -165,6 +209,7 @@ def send(self, x): arbitration_id=x.identifier, dlc=x.length, data=bytes(x)[8:]) + msg.timestamp = time.time() try: x.sent_time = time.time() except AttributeError: diff --git a/scapy/contrib/isotp.py b/scapy/contrib/isotp.py index 084aee6f97c..62271a63e7f 100644 --- a/scapy/contrib/isotp.py +++ b/scapy/contrib/isotp.py @@ -1234,8 +1234,10 @@ def _tx_timer_handler(self): if self.tx_gap == 0: continue else: + # stop and wait for tx gap self.tx_timeout_handle = TimeoutScheduler.schedule( self.tx_gap, self._tx_timer_handler) + return def on_recv(self, cf): """Function that must be called every time a CAN frame is received, to diff --git a/test/contrib/isotp.uts b/test/contrib/isotp.uts index 8f0769dd6d3..e3e2fe0c8db 100644 --- a/test/contrib/isotp.uts +++ b/test/contrib/isotp.uts @@ -1657,6 +1657,21 @@ assert len(result) == 1 assert(result[0].data == isotp.data) += Two ISOTPSockets at the same time, sending and receiving with tx_gap + +with new_can_socket0() as cs1, ISOTPSocket(cs1, sid=0x641, did=0x241, rx_separation_time_min=1) as s1, \ + new_can_socket0() as cs2, ISOTPSocket(cs2, sid=0x241, did=0x641) as s2: + isotp = ISOTP(data=b"\x10\x25" * 43) + def sender(): + s2.send(isotp) + t = Thread(target=sender) + result = s1.sniff(count=1, timeout=5, started_callback=t.start) + t.join(timeout=5) + +assert len(result) == 1 +assert(result[0].data == isotp.data) + + = Two ISOTPSockets at the same time, multiple sends/receives with new_can_socket0() as cs1, ISOTPSocket(cs1, sid=0x641, did=0x241) as s1, \ new_can_socket0() as cs2, ISOTPSocket(cs2, sid=0x241, did=0x641) as s2: diff --git a/test/contrib/isotpscan.uts b/test/contrib/isotpscan.uts index ca5d01ff89c..f4bbcc5bf58 100644 --- a/test/contrib/isotpscan.uts +++ b/test/contrib/isotpscan.uts @@ -1,14 +1,12 @@ % Regression tests for ISOTPScan - -# Currently too unstable - -~ disabled +* Some tests are disabled to lower the CI utilitzation + Configuration ~ conf = Imports import scapy.modules.six as six +from scapy.contrib.isotp import send_multiple_ext, filter_periodic_packets, scan_extended, scan if six.PY3: exec(open("test/contrib/automotive/interface_mockup.py").read()) @@ -737,15 +735,15 @@ test_dynamic(test_isotpscan_text_extended_can_id) test_dynamic(test_isotpscan_code) = Test ISOTPScan with noise (output_format=code) - +~ disabled test_dynamic(test_isotpscan_code_noise) = Test extended ISOTPScan(output_format=code) - +~ disabled test_dynamic(test_extended_isotpscan_code) = Test extended ISOTPScan(output_format=code) extended_can_id - +~ disabled test_dynamic(test_extended_isotpscan_code_extended_can_id) = Test ISOTPScan(output_format=None) @@ -765,7 +763,7 @@ test_dynamic(test_extended_isotpscan_none) test_dynamic(test_isotpscan_none_random_ids) = Test ISOTPScan(output_format=None) random IDs padding - +~ disabled test_dynamic(test_isotpscan_none_random_ids_padding) + Cleanup