Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable adding unsupported device types via the DeviceManager #262

Merged
merged 2 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ Valid subsections within a version are:

Things to be included in the next release go here.

### Added

- Added a new method to the `DeviceManager` class, `add_unsupported_device()`, which enables adding an unsupported device type.

---

## v2.1.0 (2024-07-31)
Expand Down
63 changes: 62 additions & 1 deletion examples/miscellaneous/custom_device_driver_support.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,76 @@
"""An example of external device support via a custom driver."""

from typing import Tuple, Union

from tm_devices import DeviceManager
from tm_devices.drivers import MSO5
from tm_devices.drivers.pi.pi_device import PIDevice
from tm_devices.drivers.pi.scopes.scope import Scope

# noinspection PyPep8Naming
from tm_devices.helpers import ReadOnlyCachedProperty as cached_property # noqa: N813


# Custom devices that inherit from a supported device type can be defined by inheriting from the
# specific device type class. This custom class must implement all abstract methods defined by the
# abstract parent classes.
class CustomScope(Scope):
"""Custom scope class."""

# This is an abstract method that must be implemented by the custom device driver
@cached_property
def total_channels(self) -> int:
return 4

def custom_method(self, value: str) -> None:
"""Add a custom method to the custom driver."""
print(f"{self.name}, {value=}")


# Custom devices that do not inherit from a supported device type can be defined by inheriting from
# a parent class further up the inheritance tree. This custom class must implement all abstract
# methods defined by the abstract parent classes.
class CustomDevice(PIDevice):
"""A custom device that is not one of the officially supported devices."""

# Custom device types not officially supported need to define what type of device they are.
_DEVICE_TYPE = "CustomDevice"

# This is an abstract property that must be implemented by the custom device driver.
# NOTE: The implementation of this example was copied from the base Scope class.
@property
def all_channel_names_list(self) -> Tuple[str, ...]:
return tuple(f"CH{x+1}" for x in range(self.total_channels))

# This is an abstract property that must be implemented by the custom device driver.
@cached_property
def total_channels(self) -> int:
return 4

# This is an abstract method that must be implemented by the custom device driver.
def expect_esr(self, esr: Union[int, str], error_string: str = "") -> Tuple[bool, str]:
# The contents of this method would need to be properly implemented,
# this is just example code. :)
return True, ""

# This is an abstract method that must be implemented by the custom device driver.
def get_eventlog_status(self) -> Tuple[bool, str]:
# The contents of this method would need to be properly implemented,
# this is just example code. :)
return True, ""

def custom_device_method(self, value: int) -> None:
"""Add a custom method to the custom device driver."""
print(f"{self.name}, {value=}")


# For VISA devices, the model series is based on the model that is returned from
# the ``*IDN?`` query. (See the ``tm_devices.helpers.get_model_series()`` function for details)
# For REST API devices, the model series is provided via the ``device_driver`` parameter in
# the configuration file, environment variable, or python code.
CUSTOM_DEVICE_DRIVERS = { # A mapping of custom model series strings to Python driver classes
"CustomModelSeries": CustomScope,
"CustomDeviceModelSeries": CustomDevice,
}


Expand All @@ -27,10 +79,19 @@ def custom_method(self, value: str) -> None:
mso5: MSO5 = device_manager.add_scope("192.168.0.1")
# Add the custom scope
custom_scope: CustomScope = device_manager.add_scope("192.168.0.2")
# Add the custom device that is a device type not officially supported
# NOTE: If using a config file or environment variable to define a device that is unsupported,
# the `device_type` key must be set to "UNSUPPORTED".
custom_device: CustomDevice = device_manager.add_unsupported_device("192.168.0.3")

# Custom drivers inherit all methods
custom_scope.expect_esr(0) # check for no errors
custom_scope.cleanup() # cleanup the custom scope

# Custom drivers can also use added methods
custom_scope.custom_method("value")

# Custom device types still inherit methods from their parent classes, though device type
# specific functionality is not defined by default
custom_device.expect_esr(0) # check for no errors
# Custom devices can also use any custom methods added to the custom class
custom_device.custom_device_method(10)
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ order-by-type = false
"examples/**" = [
"S101" # Use of assert detected
]
"examples/miscellaneous/custom_device_driver_support.py" = [
"ARG002", # Unused method argument
"D102" # Missing docstring in public method
]
"src/tm_devices/commands/**" = [
"A003", # Class attribute is shadowing a python builtin
"D104", # Missing docstring in public package
Expand Down
105 changes: 88 additions & 17 deletions src/tm_devices/device_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from tm_devices.components import DMConfigParser
from tm_devices.drivers.api.rest_api.margin_testers.margin_tester import MarginTester
from tm_devices.drivers.api.rest_api.rest_api_device import RESTAPIDevice
from tm_devices.drivers.device import Device
from tm_devices.drivers.pi.data_acquisition_systems.data_acquisition_system import (
DataAcquisitionSystem,
)
Expand All @@ -40,6 +41,7 @@
DeviceTypes,
DMConfigOptions,
get_model_series,
PACKAGE_NAME,
print_with_timestamp,
PYVISA_PY_BACKEND,
SerialConfig,
Expand All @@ -62,37 +64,37 @@
from pyvisa.resources import MessageBasedResource
from typing_extensions import Self

from tm_devices.drivers.device import Device

####################################################################################################
# Type Aliases
####################################################################################################
AFGAlias = TypeVar("AFGAlias", bound=AFG, default=AFG) # pylint: disable=unexpected-keyword-arg,useless-suppression # TODO: remove pylint disable statement
AFGAlias = TypeVar("AFGAlias", bound=AFG, default=AFG)
"""An alias to a specific Arbitrary Function Generator Python driver."""
AWGAlias = TypeVar("AWGAlias", bound=AWG, default=AWG) # pylint: disable=unexpected-keyword-arg,useless-suppression # TODO: remove pylint disable statement
AWGAlias = TypeVar("AWGAlias", bound=AWG, default=AWG)
"""An alias to a specific Arbitrary Waveform Generator Python driver."""
DataAcquisitionSystemAlias = TypeVar( # pylint: disable=unexpected-keyword-arg,useless-suppression # TODO: remove pylint disable statement
DataAcquisitionSystemAlias = TypeVar(
"DataAcquisitionSystemAlias", bound=DataAcquisitionSystem, default=DataAcquisitionSystem
)
"""An alias to a specific Data Acquisition System Python driver."""
DigitalMultimeterAlias = TypeVar( # pylint: disable=unexpected-keyword-arg,useless-suppression # TODO: remove pylint disable statement
DigitalMultimeterAlias = TypeVar(
"DigitalMultimeterAlias", bound=DigitalMultimeter, default=DigitalMultimeter
)
"""An alias to a specific Digital Multimeter Python driver."""
ScopeAlias = TypeVar("ScopeAlias", bound=Scope, default=Scope) # pylint: disable=unexpected-keyword-arg,useless-suppression # TODO: remove pylint disable statement
ScopeAlias = TypeVar("ScopeAlias", bound=Scope, default=Scope)
"""An alias to a specific Scope driver."""
MarginTesterAlias = TypeVar("MarginTesterAlias", bound=MarginTester, default=MarginTester) # pylint: disable=unexpected-keyword-arg,useless-suppression # TODO: remove pylint disable statement
MarginTesterAlias = TypeVar("MarginTesterAlias", bound=MarginTester, default=MarginTester)
"""An alias to a specific Margin Tester Python driver."""
PowerSupplyUnitAlias = TypeVar( # pylint: disable=unexpected-keyword-arg,useless-suppression # TODO: remove pylint disable statement
PowerSupplyUnitAlias = TypeVar(
"PowerSupplyUnitAlias", bound=PowerSupplyUnit, default=PowerSupplyUnit
)
"""An alias to a specific Power Supply Unit Python driver."""
SourceMeasureUnitAlias = TypeVar( # pylint: disable=unexpected-keyword-arg,useless-suppression # TODO: remove pylint disable statement
SourceMeasureUnitAlias = TypeVar(
"SourceMeasureUnitAlias", bound=SourceMeasureUnit, default=SourceMeasureUnit
)
"""An alias to a specific Source Measure Unit Python driver."""
SystemsSwitchAlias = TypeVar("SystemsSwitchAlias", bound=SystemsSwitch, default=SystemsSwitch) # pylint: disable=unexpected-keyword-arg,useless-suppression # TODO: remove pylint disable statement
SystemsSwitchAlias = TypeVar("SystemsSwitchAlias", bound=SystemsSwitch, default=SystemsSwitch)
"""An alias to a specific Systems Switch Python driver."""
UnsupportedDeviceAlias = TypeVar("UnsupportedDeviceAlias", bound=Device, default=Device)
"""An alias to a custom device driver for an unsupported device type."""


####################################################################################################
Expand Down Expand Up @@ -618,6 +620,48 @@ def add_ss(
),
)

def add_unsupported_device(
self,
address: str,
*,
alias: Optional[str] = None,
connection_type: Optional[str] = None,
port: Optional[int] = None,
gpib_board_number: Optional[int] = None,
) -> UnsupportedDeviceAlias:
"""Add a custom device to the DeviceManager that is not an officially supported device type.

!!! warning
This should not be used unless absolutely necessary.

Args:
address: The address of the device, either an IP address or hostname. If the connection
type is ``"USB"`` then the address must be specified as ``"<model>-<serial>"``.
alias: An optional alias to use to refer to the device. If no alias is provided,
the device type and number can be used to access the device instead.
connection_type: The type of connection to use for VISA, defaults to TCPIP, not needed
when the address is a visa resource expression since the connection type is parsed
from the address string.
port: The port to use when creating a socket connection.
gpib_board_number: The GPIB board number (also referred to as a controller) to be used
when making a GPIB connection (defaults to 0).

Returns:
The custom device driver.
"""
self.__protect_access()
return cast(
UnsupportedDeviceAlias,
self._add_device(
device_type=DeviceTypes.UNSUPPORTED.value,
address=address,
alias=alias,
connection_type=connection_type,
port=port,
gpib_board_number=gpib_board_number,
),
)

def cleanup_all_devices(self) -> None:
"""Cleanup and reset all devices."""
self.__protect_access()
Expand Down Expand Up @@ -818,7 +862,11 @@ def get_device(
message = f"{device_name} was not found in the device driver dictionary."
raise LookupError(message) from error
# double check that the device is the correct type
if device_type is not None and device.device_type != device_type.upper():
if (
device_type is not None
and device.device_type != device_type.upper()
and device.config_entry.device_type != DeviceTypes.UNSUPPORTED
):
message = (
f'A device of type "{device_type}" was specified to be accessed, '
f'but the accessed device type is actually of type "{device.device_type}".'
Expand Down Expand Up @@ -954,7 +1002,7 @@ def load_config_file(self, config_file_path: Union[str, os.PathLike[str]]) -> No
print_with_timestamp("Opening Connections to Devices")
for device_name, device_config in self.__config.devices.items():
if device_name not in self.__devices:
self.__create_device(device_name, device_config)
self.__create_device(device_name, device_config, 3)

def open(self) -> bool:
"""Reopen all devices if the DeviceManager has been previously closed.
Expand All @@ -974,7 +1022,7 @@ def open(self) -> bool:
if self.__config.devices:
print_with_timestamp("Opening Connections to Devices")
for device_name, device_config in self.__config.devices.items():
self.__create_device(device_name, device_config)
self.__create_device(device_name, device_config, 3)
if self.__setup_cleanup_enabled:
self.cleanup_all_devices()
self.__is_open = True
Expand Down Expand Up @@ -1107,7 +1155,7 @@ def _add_device( # noqa: PLR0913
config_dict["gpib_board_number"] = gpib_board_number
new_device_name, new_device_config = self.__config.add_device(**config_dict) # pyright: ignore[reportArgumentType]

return self.__create_device(new_device_name, new_device_config)
return self.__create_device(new_device_name, new_device_config, 4)

@staticmethod
def __clear_visa_output_buffer_and_get_idn(visa_resource: MessageBasedResource) -> str:
Expand Down Expand Up @@ -1179,13 +1227,17 @@ def __clear_visa_output_buffer_and_get_idn(visa_resource: MessageBasedResource)
return idn_response

def __create_device(
self, device_config_name: str, device_config: DeviceConfigEntry
self,
device_config_name: str,
device_config: DeviceConfigEntry,
warning_stacklevel: int,
) -> Union[RESTAPIDevice, PIDevice]:
"""Create a new device driver and add it to the device dictionary.

Args:
device_config_name: The name returned when creating the device_config.
device_config: The dataclass holding the device configuration information.
warning_stacklevel: The stacklevel of the warning to raise for unsupported device types.

Returns:
The created device driver.
Expand All @@ -1207,6 +1259,14 @@ def __create_device(
device_drivers = DEVICE_DRIVER_MODEL_MAPPING

alias_string = f' "{device_config.alias}"' if device_config.alias else ""
if device_config.device_type == DeviceTypes.UNSUPPORTED:
warnings.warn(
f"An unsupported device type is being added to the {self.__class__.__name__}. "
f"Not all functionality will be available in the device driver. "
f"Please consider contributing to {PACKAGE_NAME} to implement official "
f"support for this device type.",
stacklevel=warning_stacklevel,
)
print_with_timestamp(f"Creating Connection to {device_config_name}{alias_string}")
new_device: Union[RESTAPIDevice, PIDevice]
if device_config.connection_type == ConnectionTypes.REST_API:
Expand Down Expand Up @@ -1239,9 +1299,20 @@ def __create_device(
self.__devices[device_config_name] = new_device
if device_config.alias:
self.__devices[device_config.alias] = new_device
if new_device.config_entry.device_type == DeviceTypes.UNSUPPORTED:
# Add an alias to the AliasDict which contains the device_type that the custom device
# driver defines, which may be different from the device type defined in the config,
# which is "UNSUPPORTED". This allows the device to be removed from the config and
# DeviceManager when necessary.
self.__devices[f"{new_device.device_type} {new_device.device_number}".upper()] = (
ldantek marked this conversation as resolved.
Show resolved Hide resolved
new_device
)

# double check created device is correct type
if new_device.device_type != new_device.config_entry.device_type.value:
if (
new_device.device_type != new_device.config_entry.device_type.value
and new_device.config_entry.device_type != DeviceTypes.UNSUPPORTED
):
self.remove_device(
alias=new_device.config_entry.alias,
device_type=new_device.config_entry.device_type.value,
Expand Down
4 changes: 0 additions & 4 deletions src/tm_devices/drivers/api/rest_api/margin_testers/tmt4.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,3 @@ def _check_api_connection(self) -> bool:
def _cleanup(self) -> None:
"""Perform the cleanup defined for the device."""
# TODO: implement

def _reboot(self) -> None:
"""Reboot the device."""
# TODO: implement
1 change: 0 additions & 1 deletion src/tm_devices/drivers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ def _open(self) -> bool:
A boolean indicating if device connected successfully.
"""

@abstractmethod
def _reboot(self) -> None:
"""Perform the actual rebooting code."""
raise NotImplementedError(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
"""Base Data Acquisition (DAQ) device driver module."""

import inspect

from abc import ABC

from tm_devices.drivers.device import family_base_class
Expand All @@ -18,14 +16,3 @@ class DataAcquisitionSystem(TSPDevice, ABC):
################################################################################################
# Private Methods
################################################################################################
def _reboot(self) -> None:
"""Perform the actual rebooting code.

Raises:
NotImplementedError: Indicates the current driver has not implemented this method.
"""
# TODO: implement
raise NotImplementedError(
f"``.{inspect.currentframe().f_code.co_name}()``" # pyright: ignore[reportOptionalMemberAccess]
f" is not yet implemented for the {self.__class__.__name__} driver"
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
"""Base Digital Multimeter (DMM) device driver module."""

import inspect

from abc import ABC

from tm_devices.drivers.pi.tsp_device import TSPDevice
Expand All @@ -24,14 +22,3 @@ class DigitalMultimeter(TSPDevice, ABC):
################################################################################################
# Private Methods
################################################################################################
def _reboot(self) -> None:
"""Perform the actual rebooting code.

Raises:
NotImplementedError: Indicates the current driver has not implemented this method.
"""
# TODO: implement
raise NotImplementedError(
f"``.{inspect.currentframe().f_code.co_name}()``" # pyright: ignore[reportOptionalMemberAccess]
f" is not yet implemented for the {self.__class__.__name__} driver"
)
Loading
Loading