From 2aab58ba092043a89628787729175acbba8695c8 Mon Sep 17 00:00:00 2001 From: Douglas Thor Date: Fri, 17 Mar 2023 13:05:11 -0700 Subject: [PATCH] Consolidate read/write methods in `sessions` modules to the `session.Session` parent class. (#75) * Move 'read' and 'write' to session.Session * Move common session USB code to a BaseUSBSession class * Formatting * Create and use MessageBasedSession * Use MessageBasedSession on serial.SerialInstrumentSession --- pyvisa_sim/sessions/gpib.py | 45 +------------- pyvisa_sim/sessions/serial.py | 2 +- pyvisa_sim/sessions/session.py | 45 ++++++++++++++ pyvisa_sim/sessions/tcpip.py | 45 +------------- pyvisa_sim/sessions/usb.py | 107 +++------------------------------ 5 files changed, 55 insertions(+), 189 deletions(-) diff --git a/pyvisa_sim/sessions/gpib.py b/pyvisa_sim/sessions/gpib.py index ded9110..7a450a4 100644 --- a/pyvisa_sim/sessions/gpib.py +++ b/pyvisa_sim/sessions/gpib.py @@ -5,16 +5,13 @@ :license: MIT, see LICENSE for more details. """ -import time -from typing import Tuple - from pyvisa import constants, rname from . import session @session.Session.register(constants.InterfaceType.gpib, "INSTR") -class GPIBInstrumentSession(session.Session): +class GPIBInstrumentSession(session.MessageBasedSession): parsed: rname.GPIBInstr def after_parsing(self) -> None: @@ -27,43 +24,3 @@ def after_parsing(self) -> None: if self.parsed.secondary_address is not None else constants.VI_NO_SEC_ADDR ) - - def read(self, count: int) -> Tuple[bytes, constants.StatusCode]: - end_char, _ = self.get_attribute(constants.ResourceAttribute.termchar) - enabled, _ = self.get_attribute(constants.ResourceAttribute.termchar_enabled) - timeout, _ = self.get_attribute(constants.ResourceAttribute.timeout_value) - timeout /= 1000 - - start = time.time() - - out = b"" - - while time.time() - start <= timeout: - last = self.device.read() - - if not last: - time.sleep(0.01) - continue - - out += last - - if enabled: - if len(out) > 0 and out[-1] == end_char: - return out, constants.StatusCode.success_termination_character_read - - if len(out) == count: - return out, constants.StatusCode.success_max_count_read - else: - return out, constants.StatusCode.error_timeout - - def write(self, data: bytes) -> Tuple[int, constants.StatusCode]: - send_end = self.get_attribute(constants.ResourceAttribute.send_end_enabled) - - for i in range(len(data)): - self.device.write(data[i : i + 1]) - - if send_end: - # EOM4882 - pass - - return len(data), constants.StatusCode.success diff --git a/pyvisa_sim/sessions/serial.py b/pyvisa_sim/sessions/serial.py index f1dbbf0..13b1668 100644 --- a/pyvisa_sim/sessions/serial.py +++ b/pyvisa_sim/sessions/serial.py @@ -15,7 +15,7 @@ @session.Session.register(constants.InterfaceType.asrl, "INSTR") -class SerialInstrumentSession(session.Session): +class SerialInstrumentSession(session.MessageBasedSession): parsed: rname.ASRLInstr def after_parsing(self) -> None: diff --git a/pyvisa_sim/sessions/session.py b/pyvisa_sim/sessions/session.py index 68d2032..2779e89 100644 --- a/pyvisa_sim/sessions/session.py +++ b/pyvisa_sim/sessions/session.py @@ -5,6 +5,7 @@ :license: MIT, see LICENSE for more details. """ +import time from typing import Any, Callable, Dict, Optional, Tuple, Type, TypeVar from pyvisa import attributes, constants, rname, typing @@ -202,3 +203,47 @@ def set_attribute( return constants.StatusCode.error_nonsupported_attribute_state return constants.StatusCode.success + + +class MessageBasedSession(Session): + """Base class for Message-Based sessions that support ``read`` and ``write`` methods.""" + + def read(self, count: int) -> Tuple[bytes, constants.StatusCode]: + end_char, _ = self.get_attribute(constants.ResourceAttribute.termchar) + enabled, _ = self.get_attribute(constants.ResourceAttribute.termchar_enabled) + timeout, _ = self.get_attribute(constants.ResourceAttribute.timeout_value) + timeout /= 1000 + + start = time.time() + + out = b"" + + while time.time() - start <= timeout: + last = self.device.read() + + if not last: + time.sleep(0.01) + continue + + out += last + + if enabled: + if len(out) > 0 and out[-1] == end_char: + return out, constants.StatusCode.success_termination_character_read + + if len(out) == count: + return out, constants.StatusCode.success_max_count_read + else: + return out, constants.StatusCode.error_timeout + + def write(self, data: bytes) -> Tuple[int, constants.StatusCode]: + send_end = self.get_attribute(constants.ResourceAttribute.send_end_enabled) + + for i in range(len(data)): + self.device.write(data[i : i + 1]) + + if send_end: + # EOM4882 + pass + + return len(data), constants.StatusCode.success diff --git a/pyvisa_sim/sessions/tcpip.py b/pyvisa_sim/sessions/tcpip.py index b9fb231..71763e5 100644 --- a/pyvisa_sim/sessions/tcpip.py +++ b/pyvisa_sim/sessions/tcpip.py @@ -5,57 +5,14 @@ :license: MIT, see LICENSE for more details. """ -import time -from typing import Tuple - from pyvisa import constants, rname from . import session -class BaseTCPIPSession(session.Session): +class BaseTCPIPSession(session.MessageBasedSession): """Base class for TCPIP sessions.""" - def read(self, count: int) -> Tuple[bytes, constants.StatusCode]: - end_char, _ = self.get_attribute(constants.ResourceAttribute.termchar) - enabled, _ = self.get_attribute(constants.ResourceAttribute.termchar_enabled) - timeout, _ = self.get_attribute(constants.ResourceAttribute.timeout_value) - timeout /= 1000 - - start = time.time() - - out = b"" - - while time.time() - start <= timeout: - last = self.device.read() - - if not last: - time.sleep(0.01) - continue - - out += last - - if enabled: - if len(out) > 0 and out[-1] == end_char: - return out, constants.StatusCode.success_termination_character_read - - if len(out) == count: - return out, constants.StatusCode.success_max_count_read - else: - return out, constants.StatusCode.error_timeout - - def write(self, data: bytes) -> Tuple[int, constants.StatusCode]: - send_end = self.get_attribute(constants.ResourceAttribute.send_end_enabled) - - for i in range(len(data)): - self.device.write(data[i : i + 1]) - - if send_end: - # EOM 4882 - pass - - return len(data), constants.StatusCode.success - @session.Session.register(constants.InterfaceType.tcpip, "INSTR") class TCPIPInstrumentSession(BaseTCPIPSession): diff --git a/pyvisa_sim/sessions/usb.py b/pyvisa_sim/sessions/usb.py index 8042b3b..025f7b2 100644 --- a/pyvisa_sim/sessions/usb.py +++ b/pyvisa_sim/sessions/usb.py @@ -5,17 +5,15 @@ :license: MIT, see LICENSE for more details. """ -import time -from typing import Tuple +from typing import Union from pyvisa import constants, rname from . import session -@session.Session.register(constants.InterfaceType.usb, "INSTR") -class USBInstrumentSession(session.Session): - parsed: rname.USBInstr +class BaseUSBSession(session.MessageBasedSession): + parsed: Union[rname.USBInstr, rname.USBRaw] def after_parsing(self) -> None: self.attrs[constants.ResourceAttribute.interface_number] = int( @@ -32,103 +30,12 @@ def after_parsing(self) -> None: self.parsed.board ) - def read(self, count: int) -> Tuple[bytes, constants.StatusCode]: - end_char, _ = self.get_attribute(constants.ResourceAttribute.termchar) - enabled, _ = self.get_attribute(constants.ResourceAttribute.termchar_enabled) - timeout, _ = self.get_attribute(constants.ResourceAttribute.timeout_value) - timeout /= 1000 - - start = time.time() - - out = b"" - - while time.time() - start <= timeout: - last = self.device.read() - - if not last: - time.sleep(0.01) - continue - - out += last - - if enabled: - if len(out) > 0 and out[-1] == end_char: - return out, constants.StatusCode.success_termination_character_read - - if len(out) == count: - return out, constants.StatusCode.success_max_count_read - else: - return out, constants.StatusCode.error_timeout - - def write(self, data: bytes) -> Tuple[int, constants.StatusCode]: - send_end = self.get_attribute(constants.ResourceAttribute.send_end_enabled) - - for i in range(len(data)): - self.device.write(data[i : i + 1]) - if send_end: - # EOM 4882 - pass - - return len(data), constants.StatusCode.success +@session.Session.register(constants.InterfaceType.usb, "INSTR") +class USBInstrumentSession(BaseUSBSession): + parsed: rname.USBInstr @session.Session.register(constants.InterfaceType.usb, "RAW") -class USBRawSession(session.Session): +class USBRawSession(BaseUSBSession): parsed: rname.USBRaw - - def after_parsing(self) -> None: - self.attrs[constants.ResourceAttribute.interface_number] = int( - self.parsed.board - ) - self.attrs[ - constants.ResourceAttribute.manufacturer_id - ] = self.parsed.manufacturer_id - self.attrs[constants.ResourceAttribute.model_code] = self.parsed.model_code - self.attrs[ - constants.ResourceAttribute.usb_serial_number - ] = self.parsed.serial_number - self.attrs[constants.ResourceAttribute.usb_interface_number] = int( - self.parsed.board - ) - - def read(self, count: int) -> Tuple[bytes, constants.StatusCode]: - end_char, _ = self.get_attribute(constants.ResourceAttribute.termchar) - enabled, _ = self.get_attribute(constants.ResourceAttribute.termchar_enabled) - timeout, _ = self.get_attribute(constants.ResourceAttribute.timeout_value) - timeout /= 1000 - - now = start = time.time() - - out = b"" - - while now - start <= timeout: - last = self.device.read() - - if not last: - time.sleep(0.01) - now = time.time() - continue - - out += last - - if enabled: - if len(out) > 0 and out[-1] == end_char: - return out, constants.StatusCode.success_termination_character_read - - if len(out) == count: - return out, constants.StatusCode.success_max_count_read - else: - return out, constants.StatusCode.error_timeout - - def write(self, data: bytes) -> Tuple[int, constants.StatusCode]: - send_end = self.get_attribute(constants.ResourceAttribute.send_end_enabled) - - for i in range(len(data)): - self.device.write(data[i : i + 1]) - - if send_end: - # EOM 4882 - pass - - return len(data), constants.StatusCode.success