Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update measurement link examples to match dcpower example changes. #284

Merged
merged 23 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions examples/nidaqmx_analog_input/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@

import logging
import pathlib
from typing import Any, Callable, Dict, NamedTuple, TypeVar
import types
from typing import Any, Callable, Dict, NamedTuple, TypeVar, Optional

import click
import grpc

import ni_measurementlink_service as nims
from ni_measurementlink_service import session_management
from ni_measurementlink_service._internal.discovery_client import DiscoveryClient
from ni_measurementlink_service._internal.stubs.ni.measurementlink.pinmap.v1 import (
pin_map_service_pb2,
pin_map_service_pb2_grpc,
)
from ni_measurementlink_service.measurement.service import GrpcChannelPool
from ni_measurementlink_service.measurement.service import (
GrpcChannelPool,
MeasurementService,
)


class ServiceOptions(NamedTuple):
Expand Down Expand Up @@ -231,3 +236,32 @@ def use_simulation_option(default: bool) -> Callable[[F], F]:
is_flag=True,
help="Use simulated instruments.",
)


def get_grpc_device_channel(
measurement_service: MeasurementService,
driver_module: types.ModuleType,
service_options: ServiceOptions,
) -> Optional[grpc.Channel]:
"""Returns driver specific grpc device channel."""
if service_options.use_grpc_device:
if service_options.grpc_device_address:
return measurement_service.channel_pool.get_channel(service_options.grpc_device_address)

return measurement_service.get_channel(
provided_interface=getattr(driver_module, "GRPC_SERVICE_INTERFACE_NAME"),
service_class="ni.measurementlink.v1.grpcdeviceserver",
)
return None


def create_session_management_client(
measurement_service: MeasurementService,
) -> nims.session_management.Client:
"""Return created session management client."""
return nims.session_management.Client(
grpc_channel=measurement_service.get_channel(
provided_interface=nims.session_management.GRPC_SERVICE_INTERFACE_NAME,
service_class=nims.session_management.GRPC_SERVICE_CLASS,
)
)
26 changes: 26 additions & 0 deletions examples/nidaqmx_analog_input/_nidaqmx_helpers.py
subash-suresh marked this conversation as resolved.
Show resolved Hide resolved
subash-suresh marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""nidaqmx Helper classes and functions for MeasurementLink examples."""

from typing import Optional

import grpc
import nidaqmx

import ni_measurementlink_service as nims


def create_task(
session_info: nims.session_management.SessionInformation,
session_grpc_channel: Optional[grpc.Channel] = None,
initialization_behavior=nidaqmx.SessionInitializationBehavior.AUTO,
) -> nidaqmx.Task:
"""Create daqmx task based on reserved session and grpc channel."""
session_kwargs = {}

if session_grpc_channel is not None:
session_kwargs["grpc_options"] = nidaqmx.GrpcSessionOptions(
session_grpc_channel,
session_name=session_info.session_name,
initialization_behavior=initialization_behavior,
)

return nidaqmx.Task(new_task_name=session_info.session_name, **session_kwargs)
80 changes: 26 additions & 54 deletions examples/nidaqmx_analog_input/measurement.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Perform a finite analog input measurement with NI-DAQmx."""

import contextlib
import logging
import pathlib
from typing import Optional
Expand All @@ -11,10 +10,13 @@
from _helpers import (
ServiceOptions,
configure_logging,
create_session_management_client,
get_grpc_device_channel,
get_service_options,
grpc_device_options,
verbosity_option,
)
from _nidaqmx_helpers import create_task
from nidaqmx.constants import TaskMode

import ni_measurementlink_service as nims
Expand Down Expand Up @@ -46,24 +48,16 @@ def measure(pin_name, sample_rate, number_of_samples):
sample_rate,
subash-suresh marked this conversation as resolved.
Show resolved Hide resolved
number_of_samples,
)
session_management_client = nims.session_management.Client(
grpc_channel=measurement_service.get_channel(
provided_interface=nims.session_management.GRPC_SERVICE_INTERFACE_NAME,
service_class=nims.session_management.GRPC_SERVICE_CLASS,
)
)
with contextlib.ExitStack() as stack:
reservation = stack.enter_context(
session_management_client.reserve_sessions(
context=measurement_service.context.pin_map_context,
pin_or_relay_names=[pin_name],
instrument_type_id=nims.session_management.INSTRUMENT_TYPE_NI_DAQMX,
# If another measurement is using the session, wait for it to complete.
# Specify a timeout to aid in debugging missed unreserve calls.
# Long measurements may require a longer timeout.
timeout=60,
)
)
session_management_client = create_session_management_client(measurement_service)
with session_management_client.reserve_sessions(
context=measurement_service.context.pin_map_context,
pin_or_relay_names=[pin_name],
instrument_type_id=nims.session_management.INSTRUMENT_TYPE_NI_DAQMX,
# If another measurement is using the session, wait for it to complete.
# Specify a timeout to aid in debugging missed unreserve calls.
# Long measurements may require a longer timeout.
timeout=60,
) as reservation:
if len(reservation.session_info) != 1:
measurement_service.context.abort(
grpc.StatusCode.INVALID_ARGUMENT,
Expand All @@ -80,49 +74,27 @@ def cancel_callback():

measurement_service.context.add_cancel_callback(cancel_callback)

task = stack.enter_context(_create_nidaqmx_task(session_info))
if not session_info.session_exists:
task.ai_channels.add_ai_voltage_chan(session_info.channel_list)
grpc_device_channel = get_grpc_device_channel(measurement_service, nidaqmx, service_options)
with create_task(session_info, grpc_device_channel) as task:
if not session_info.session_exists:
task.ai_channels.add_ai_voltage_chan(session_info.channel_list)

task.timing.cfg_samp_clk_timing(
rate=sample_rate,
samps_per_chan=number_of_samples,
)
task.timing.cfg_samp_clk_timing(
rate=sample_rate,
samps_per_chan=number_of_samples,
)

timeout = min(measurement_service.context.time_remaining, 10.0)
voltage_values = task.read(number_of_samples_per_channel=number_of_samples, timeout=timeout)
task = None # Don't abort after this point
timeout = min(measurement_service.context.time_remaining, 10.0)
voltage_values = task.read(
number_of_samples_per_channel=number_of_samples, timeout=timeout
)
task = None # Don't abort after this point

_log_measured_values(voltage_values)
logging.info("Completed measurement")
return (voltage_values,)


def _create_nidaqmx_task(
session_info: nims.session_management.SessionInformation,
) -> nidaqmx.Task:
session_kwargs = {}
if service_options.use_grpc_device:
session_grpc_address = service_options.grpc_device_address

if not session_grpc_address:
session_grpc_channel = measurement_service.get_channel(
provided_interface=nidaqmx.GRPC_SERVICE_INTERFACE_NAME,
service_class="ni.measurementlink.v1.grpcdeviceserver",
)
else:
session_grpc_channel = measurement_service.channel_pool.get_channel(
target=session_grpc_address
)
session_kwargs["grpc_options"] = nidaqmx.GrpcSessionOptions(
session_grpc_channel,
session_name=session_info.session_name,
initialization_behavior=nidaqmx.SessionInitializationBehavior.AUTO,
)

return nidaqmx.Task(new_task_name=session_info.session_name, **session_kwargs)


def _log_measured_values(samples, max_samples_to_display=5):
"""Log the measured values."""
if len(samples) > max_samples_to_display:
Expand Down
38 changes: 15 additions & 23 deletions examples/nidaqmx_analog_input/teststand_fixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@

import nidaqmx
from _helpers import GrpcChannelPoolHelper, PinMapClient, TestStandSupport
from nidaqmx.grpc_session_options import (
GRPC_SERVICE_INTERFACE_NAME,
GrpcSessionOptions,
SessionInitializationBehavior,
)
from _nidaqmx_helpers import create_task

import ni_measurementlink_service as nims

Expand Down Expand Up @@ -47,27 +43,25 @@ def create_nidaqmx_tasks(sequence_context: Any) -> None:
session_management_client = nims.session_management.Client(
grpc_channel=grpc_channel_pool.session_management_channel
)
session_kwargs = {}

teststand_support = TestStandSupport(sequence_context)
pin_map_id = teststand_support.get_active_pin_map_id()

pin_map_context = nims.session_management.PinMapContext(pin_map_id=pin_map_id, sites=None)
grpc_device_channel = grpc_channel_pool.get_grpc_device_channel(
nidaqmx.GRPC_SERVICE_INTERFACE_NAME
)
with session_management_client.reserve_sessions(
context=pin_map_context,
instrument_type_id=nims.session_management.INSTRUMENT_TYPE_NI_DAQMX,
# This code module sets up the sessions, so error immediately if they are in use.
timeout=0,
) as reservation:
for session_info in reservation.session_info:
session_kwargs["grpc_options"] = GrpcSessionOptions(
grpc_channel_pool.get_grpc_device_channel(GRPC_SERVICE_INTERFACE_NAME),
session_name=session_info.session_name,
initialization_behavior=SessionInitializationBehavior.INITIALIZE_SERVER_SESSION,
task = create_task(
session_info,
grpc_device_channel,
initialization_behavior=nidaqmx.SessionInitializationBehavior.INITIALIZE_SERVER_SESSION,
)

# Leave session open
task = nidaqmx.Task(new_task_name=session_info.session_name, **session_kwargs)
task.ai_channels.add_ai_voltage_chan(session_info.channel_list)

session_management_client.register_sessions(reservation.session_info)
Expand All @@ -79,7 +73,9 @@ def destroy_nidaqmx_tasks() -> None:
session_management_client = nims.session_management.Client(
grpc_channel=grpc_channel_pool.session_management_channel
)

grpc_device_channel = grpc_channel_pool.get_grpc_device_channel(
nidaqmx.GRPC_SERVICE_INTERFACE_NAME
)
with session_management_client.reserve_all_registered_sessions(
instrument_type_id=nims.session_management.INSTRUMENT_TYPE_NI_DAQMX,
# This code module sets up the sessions, so error immediately if they are in use.
Expand All @@ -88,13 +84,9 @@ def destroy_nidaqmx_tasks() -> None:
session_management_client.unregister_sessions(reservation.session_info)

for session_info in reservation.session_info:
grpc_options = GrpcSessionOptions(
grpc_channel_pool.get_grpc_device_channel(GRPC_SERVICE_INTERFACE_NAME),
session_name=session_info.session_name,
initialization_behavior=SessionInitializationBehavior.ATTACH_TO_SERVER_SESSION,
)

task = nidaqmx.Task(
new_task_name=session_info.session_name, grpc_options=grpc_options
task = create_task(
session_info,
grpc_device_channel,
initialization_behavior=nidaqmx.SessionInitializationBehavior.ATTACH_TO_SERVER_SESSION,
)
task.close()
26 changes: 18 additions & 8 deletions examples/nidcpower_source_dc_voltage/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import pathlib
import types
from typing import Any, Callable, Dict, NamedTuple, TypeVar
from typing import Any, Callable, Dict, NamedTuple, TypeVar, Optional

import click
import grpc
Expand All @@ -15,7 +15,10 @@
pin_map_service_pb2,
pin_map_service_pb2_grpc,
)
from ni_measurementlink_service.measurement.service import GrpcChannelPool, MeasurementService
from ni_measurementlink_service.measurement.service import (
GrpcChannelPool,
MeasurementService,
)


class ServiceOptions(NamedTuple):
Expand Down Expand Up @@ -236,13 +239,20 @@ def use_simulation_option(default: bool) -> Callable[[F], F]:


def get_grpc_device_channel(
measurement_service: MeasurementService, driver_module: types.ModuleType
) -> grpc.Channel:
measurement_service: MeasurementService,
driver_module: types.ModuleType,
service_options: ServiceOptions,
) -> Optional[grpc.Channel]:
"""Returns driver specific grpc device channel."""
return measurement_service.get_channel(
provided_interface=getattr(driver_module, "GRPC_SERVICE_INTERFACE_NAME"),
service_class="ni.measurementlink.v1.grpcdeviceserver",
)
if service_options.use_grpc_device:
if service_options.grpc_device_address:
return measurement_service.channel_pool.get_channel(service_options.grpc_device_address)

return measurement_service.get_channel(
provided_interface=getattr(driver_module, "GRPC_SERVICE_INTERFACE_NAME"),
service_class="ni.measurementlink.v1.grpcdeviceserver",
)
return None


def create_session_management_client(
Expand Down
38 changes: 9 additions & 29 deletions examples/nidcpower_source_dc_voltage/_nidcpower_helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""nidcpower Helper classes and functions for MeasurementLink examples."""

from typing import Any, Dict, Iterable, Optional
from typing import Any, Dict, Optional

import grpc
import nidcpower
Expand All @@ -14,10 +14,8 @@

def create_session(
session_info: nims.session_management.SessionInformation,
session_grpc_channel: grpc.Channel,
initialization_behavior: Optional[
nidcpower.SessionInitializationBehavior
] = nidcpower.SessionInitializationBehavior.AUTO,
session_grpc_channel: Optional[grpc.Channel] = None,
initialization_behavior=nidcpower.SessionInitializationBehavior.AUTO,
) -> nidcpower.Session:
"""Create driver session based on reserved session and grpc channel."""
options: Dict[str, Any] = {}
Expand All @@ -27,31 +25,13 @@ def create_session(

session_kwargs: Dict[str, Any] = {}

session_kwargs["grpc_options"] = nidcpower.GrpcSessionOptions(
session_grpc_channel,
session_name=session_info.session_name,
initialization_behavior=initialization_behavior,
)
if session_grpc_channel is not None:
session_kwargs["grpc_options"] = nidcpower.GrpcSessionOptions(
session_grpc_channel,
session_name=session_info.session_name,
initialization_behavior=initialization_behavior,
)

return nidcpower.Session(
resource_name=session_info.resource_name, options=options, **session_kwargs
)


def reserve_session(
session_management_client: nims.session_management.Client,
pin_map_context: nims.session_management.PinMapContext,
pin_names: Optional[Iterable[str]] = None,
timeout: Optional[float] = None,
) -> nims.session_management.Reservation:
"""Reserve session(s).

Reserve session(s) for the given pins and returns the
information needed to create or access the session.
"""
return session_management_client.reserve_sessions(
context=pin_map_context,
pin_or_relay_names=pin_names,
instrument_type_id=nims.session_management.INSTRUMENT_TYPE_NI_DCPOWER,
timeout=timeout,
)
Loading