Skip to content

Commit

Permalink
Consolidate read/write methods in sessions modules to the `session.…
Browse files Browse the repository at this point in the history
…Session` parent class. (#75)

* Move 'read' and 'write' to session.Session

* Move common session USB code to a BaseUSBSession class

* Formatting

* Create and use MessageBasedSession

* Use MessageBasedSession on serial.SerialInstrumentSession
  • Loading branch information
dougthor42 authored Mar 17, 2023
1 parent d06bc89 commit 2aab58b
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 189 deletions.
45 changes: 1 addition & 44 deletions pyvisa_sim/sessions/gpib.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@
:license: MIT, see LICENSE for more details.
"""
import time
from typing import Tuple

from pyvisa import constants, rname

from . import session


@session.Session.register(constants.InterfaceType.gpib, "INSTR")
class GPIBInstrumentSession(session.Session):
class GPIBInstrumentSession(session.MessageBasedSession):
parsed: rname.GPIBInstr

def after_parsing(self) -> None:
Expand All @@ -27,43 +24,3 @@ def after_parsing(self) -> None:
if self.parsed.secondary_address is not None
else constants.VI_NO_SEC_ADDR
)

def read(self, count: int) -> Tuple[bytes, constants.StatusCode]:
end_char, _ = self.get_attribute(constants.ResourceAttribute.termchar)
enabled, _ = self.get_attribute(constants.ResourceAttribute.termchar_enabled)
timeout, _ = self.get_attribute(constants.ResourceAttribute.timeout_value)
timeout /= 1000

start = time.time()

out = b""

while time.time() - start <= timeout:
last = self.device.read()

if not last:
time.sleep(0.01)
continue

out += last

if enabled:
if len(out) > 0 and out[-1] == end_char:
return out, constants.StatusCode.success_termination_character_read

if len(out) == count:
return out, constants.StatusCode.success_max_count_read
else:
return out, constants.StatusCode.error_timeout

def write(self, data: bytes) -> Tuple[int, constants.StatusCode]:
send_end = self.get_attribute(constants.ResourceAttribute.send_end_enabled)

for i in range(len(data)):
self.device.write(data[i : i + 1])

if send_end:
# EOM4882
pass

return len(data), constants.StatusCode.success
2 changes: 1 addition & 1 deletion pyvisa_sim/sessions/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


@session.Session.register(constants.InterfaceType.asrl, "INSTR")
class SerialInstrumentSession(session.Session):
class SerialInstrumentSession(session.MessageBasedSession):
parsed: rname.ASRLInstr

def after_parsing(self) -> None:
Expand Down
45 changes: 45 additions & 0 deletions pyvisa_sim/sessions/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
:license: MIT, see LICENSE for more details.
"""
import time
from typing import Any, Callable, Dict, Optional, Tuple, Type, TypeVar

from pyvisa import attributes, constants, rname, typing
Expand Down Expand Up @@ -202,3 +203,47 @@ def set_attribute(
return constants.StatusCode.error_nonsupported_attribute_state

return constants.StatusCode.success


class MessageBasedSession(Session):
"""Base class for Message-Based sessions that support ``read`` and ``write`` methods."""

def read(self, count: int) -> Tuple[bytes, constants.StatusCode]:
end_char, _ = self.get_attribute(constants.ResourceAttribute.termchar)
enabled, _ = self.get_attribute(constants.ResourceAttribute.termchar_enabled)
timeout, _ = self.get_attribute(constants.ResourceAttribute.timeout_value)
timeout /= 1000

start = time.time()

out = b""

while time.time() - start <= timeout:
last = self.device.read()

if not last:
time.sleep(0.01)
continue

out += last

if enabled:
if len(out) > 0 and out[-1] == end_char:
return out, constants.StatusCode.success_termination_character_read

if len(out) == count:
return out, constants.StatusCode.success_max_count_read
else:
return out, constants.StatusCode.error_timeout

def write(self, data: bytes) -> Tuple[int, constants.StatusCode]:
send_end = self.get_attribute(constants.ResourceAttribute.send_end_enabled)

for i in range(len(data)):
self.device.write(data[i : i + 1])

if send_end:
# EOM4882
pass

return len(data), constants.StatusCode.success
45 changes: 1 addition & 44 deletions pyvisa_sim/sessions/tcpip.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,57 +5,14 @@
:license: MIT, see LICENSE for more details.
"""
import time
from typing import Tuple

from pyvisa import constants, rname

from . import session


class BaseTCPIPSession(session.Session):
class BaseTCPIPSession(session.MessageBasedSession):
"""Base class for TCPIP sessions."""

def read(self, count: int) -> Tuple[bytes, constants.StatusCode]:
end_char, _ = self.get_attribute(constants.ResourceAttribute.termchar)
enabled, _ = self.get_attribute(constants.ResourceAttribute.termchar_enabled)
timeout, _ = self.get_attribute(constants.ResourceAttribute.timeout_value)
timeout /= 1000

start = time.time()

out = b""

while time.time() - start <= timeout:
last = self.device.read()

if not last:
time.sleep(0.01)
continue

out += last

if enabled:
if len(out) > 0 and out[-1] == end_char:
return out, constants.StatusCode.success_termination_character_read

if len(out) == count:
return out, constants.StatusCode.success_max_count_read
else:
return out, constants.StatusCode.error_timeout

def write(self, data: bytes) -> Tuple[int, constants.StatusCode]:
send_end = self.get_attribute(constants.ResourceAttribute.send_end_enabled)

for i in range(len(data)):
self.device.write(data[i : i + 1])

if send_end:
# EOM 4882
pass

return len(data), constants.StatusCode.success


@session.Session.register(constants.InterfaceType.tcpip, "INSTR")
class TCPIPInstrumentSession(BaseTCPIPSession):
Expand Down
107 changes: 7 additions & 100 deletions pyvisa_sim/sessions/usb.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,15 @@
:license: MIT, see LICENSE for more details.
"""
import time
from typing import Tuple
from typing import Union

from pyvisa import constants, rname

from . import session


@session.Session.register(constants.InterfaceType.usb, "INSTR")
class USBInstrumentSession(session.Session):
parsed: rname.USBInstr
class BaseUSBSession(session.MessageBasedSession):
parsed: Union[rname.USBInstr, rname.USBRaw]

def after_parsing(self) -> None:
self.attrs[constants.ResourceAttribute.interface_number] = int(
Expand All @@ -32,103 +30,12 @@ def after_parsing(self) -> None:
self.parsed.board
)

def read(self, count: int) -> Tuple[bytes, constants.StatusCode]:
end_char, _ = self.get_attribute(constants.ResourceAttribute.termchar)
enabled, _ = self.get_attribute(constants.ResourceAttribute.termchar_enabled)
timeout, _ = self.get_attribute(constants.ResourceAttribute.timeout_value)
timeout /= 1000

start = time.time()

out = b""

while time.time() - start <= timeout:
last = self.device.read()

if not last:
time.sleep(0.01)
continue

out += last

if enabled:
if len(out) > 0 and out[-1] == end_char:
return out, constants.StatusCode.success_termination_character_read

if len(out) == count:
return out, constants.StatusCode.success_max_count_read
else:
return out, constants.StatusCode.error_timeout

def write(self, data: bytes) -> Tuple[int, constants.StatusCode]:
send_end = self.get_attribute(constants.ResourceAttribute.send_end_enabled)

for i in range(len(data)):
self.device.write(data[i : i + 1])

if send_end:
# EOM 4882
pass

return len(data), constants.StatusCode.success
@session.Session.register(constants.InterfaceType.usb, "INSTR")
class USBInstrumentSession(BaseUSBSession):
parsed: rname.USBInstr


@session.Session.register(constants.InterfaceType.usb, "RAW")
class USBRawSession(session.Session):
class USBRawSession(BaseUSBSession):
parsed: rname.USBRaw

def after_parsing(self) -> None:
self.attrs[constants.ResourceAttribute.interface_number] = int(
self.parsed.board
)
self.attrs[
constants.ResourceAttribute.manufacturer_id
] = self.parsed.manufacturer_id
self.attrs[constants.ResourceAttribute.model_code] = self.parsed.model_code
self.attrs[
constants.ResourceAttribute.usb_serial_number
] = self.parsed.serial_number
self.attrs[constants.ResourceAttribute.usb_interface_number] = int(
self.parsed.board
)

def read(self, count: int) -> Tuple[bytes, constants.StatusCode]:
end_char, _ = self.get_attribute(constants.ResourceAttribute.termchar)
enabled, _ = self.get_attribute(constants.ResourceAttribute.termchar_enabled)
timeout, _ = self.get_attribute(constants.ResourceAttribute.timeout_value)
timeout /= 1000

now = start = time.time()

out = b""

while now - start <= timeout:
last = self.device.read()

if not last:
time.sleep(0.01)
now = time.time()
continue

out += last

if enabled:
if len(out) > 0 and out[-1] == end_char:
return out, constants.StatusCode.success_termination_character_read

if len(out) == count:
return out, constants.StatusCode.success_max_count_read
else:
return out, constants.StatusCode.error_timeout

def write(self, data: bytes) -> Tuple[int, constants.StatusCode]:
send_end = self.get_attribute(constants.ResourceAttribute.send_end_enabled)

for i in range(len(data)):
self.device.write(data[i : i + 1])

if send_end:
# EOM 4882
pass

return len(data), constants.StatusCode.success

0 comments on commit 2aab58b

Please sign in to comment.