Skip to content

Commit

Permalink
IoT Hub Monitor Events - Enable users to specify max message count (#555
Browse files Browse the repository at this point in the history
)

* Mirabai's changes

* Updating branch

* remove mqtt

* Fix hub creation logic

* Fix paths

* updates

* revert to dev

* revert to dev

* IoT Hub monitor events add message count parameter

* Add tests

* Address comments

* Update history

* Update history
  • Loading branch information
avagraw authored Aug 17, 2022
1 parent 77ac439 commit f1ddeab
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 4 deletions.
6 changes: 6 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ unreleased
* The Device Update command group supports all data plane functionality via **in-preview** `update` and `device`
sub-command groups. The data plane API version used is 2022-07-01-preview.

**IoT Hub Update**

* Updated the `az iot hub monitor-events` command to support an optional `--message-count` argument.
The message-count defines the maximum number of messages received from the hub before the monitor automatically stops.
If not provided the monitor keeps running until the user force-kills the monitor.


0.16.1
+++++++++++++++
Expand Down
3 changes: 3 additions & 0 deletions azext_iot/_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@
- name: Receive all messages and parse message payload as JSON
text: >
az iot hub monitor-events -n {iothub_name} --content-type application/json
- name: Receive the specified number of messages from hub and then shut down.
text: >
az iot hub monitor-events -n {iothub_name} --message-count {message_count}
"""

helps[
Expand Down
7 changes: 7 additions & 0 deletions azext_iot/_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,13 @@ def load_arguments(self, _):
options_list=["--interface", "-i"],
help="Target interface identifier to filter on. For example: dtmi:com:example:TemperatureController;1",
)
context.argument(
"message_count",
options_list=["--message-count", "--mc"],
type=int,
help="Number of telemetry messages to capture before the monitor is terminated. "
"If not specified, monitor keeps running until meeting the timeout threshold of not receiving messages from hub.",
)

with self.argument_context("iot hub monitor-feedback") as context:
context.argument(
Expand Down
10 changes: 8 additions & 2 deletions azext_iot/common/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from threading import Event, Thread
from datetime import datetime
from knack.log import get_logger
from typing import Optional
from azure.cli.core.azclierror import (
CLIInternalError,
FileOperationError,
Expand Down Expand Up @@ -411,7 +412,7 @@ def calculate_millisec_since_unix_epoch_utc(offset_seconds: int = 0):
return int(1000 * ((now - epoch).total_seconds() + offset_seconds))


def init_monitoring(cmd, timeout, properties, enqueued_time, repair, yes):
def init_monitoring(cmd, timeout, properties, enqueued_time, repair, yes, message_count: Optional[int] = None):
from azext_iot.common.deps import ensure_uamqp

if timeout < 0:
Expand All @@ -420,6 +421,11 @@ def init_monitoring(cmd, timeout, properties, enqueued_time, repair, yes):
)
timeout = timeout * 1000

if message_count and message_count <= 0:
raise InvalidArgumentValueError(
"Message count must be greater than 0."
)

config = cmd.cli_ctx.config
output = cmd.cli_ctx.invocation.data.get("output", None)
if not output:
Expand All @@ -432,7 +438,7 @@ def init_monitoring(cmd, timeout, properties, enqueued_time, repair, yes):

if not enqueued_time:
enqueued_time = calculate_millisec_since_unix_epoch_utc()
return (enqueued_time, properties, timeout, output)
return (enqueued_time, properties, timeout, output, message_count)


def dict_clean(d):
Expand Down
8 changes: 8 additions & 0 deletions azext_iot/monitor/handlers/common_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
from azext_iot.monitor.base_classes import AbstractBaseEventsHandler
from azext_iot.monitor.parsers.common_parser import CommonParser
from azext_iot.monitor.models.arguments import CommonHandlerArguments
from azext_iot.monitor.utility import stop_monitor


class CommonHandler(AbstractBaseEventsHandler):
def __init__(self, common_handler_args: CommonHandlerArguments):
super(CommonHandler, self).__init__()
self._common_handler_args = common_handler_args
self.message_count = 0

def parse_message(self, message):
parser = CommonParser(
Expand All @@ -42,6 +44,12 @@ def parse_message(self, message):

print(dump, flush=True)

self.message_count += 1
if self._common_handler_args.max_messages and self.message_count == self._common_handler_args.max_messages:
message = "Successfully parsed {} message(s).".format(self._common_handler_args.max_messages)
print(message, flush=True)
stop_monitor()

def _should_process_device(self, device_id):
expected_device_id = self._common_handler_args.device_id
expected_devices = self._common_handler_args.devices
Expand Down
3 changes: 3 additions & 0 deletions azext_iot/monitor/models/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from azure.cli.core.commands import AzCliCommand
from azext_iot.common.utility import init_monitoring
from azext_iot.monitor.models.enum import Severity
from typing import Optional


class TelemetryArguments:
Expand Down Expand Up @@ -50,13 +51,15 @@ def __init__(
device_id="",
interface_name="",
module_id="",
max_messages: Optional[int] = None,
):
self.output = output
self.devices = devices or []
self.device_id = device_id or ""
self.interface_name = interface_name or ""
self.module_id = module_id or ""
self.common_parser_args = common_parser_args
self.max_messages = max_messages


class CentralHandlerArguments:
Expand Down
9 changes: 7 additions & 2 deletions azext_iot/operations/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
)
from azext_iot._factory import SdkResolver, CloudError
from azext_iot.operations.generic import _execute_query, _process_top
from typing import Optional
import pprint

logger = get_logger(__name__)
Expand Down Expand Up @@ -2401,6 +2402,7 @@ def iot_hub_monitor_events(
login=None,
content_type=None,
device_query=None,
message_count: Optional[int] = None,
):
try:
_iot_hub_monitor_events(
Expand All @@ -2419,6 +2421,7 @@ def iot_hub_monitor_events(
login=login,
content_type=content_type,
device_query=device_query,
message_count=message_count,
)
except RuntimeError as e:
raise CLIInternalError(e)
Expand Down Expand Up @@ -2491,9 +2494,10 @@ def _iot_hub_monitor_events(
login=None,
content_type=None,
device_query=None,
message_count: Optional[int] = None,
):
(enqueued_time, properties, timeout, output) = init_monitoring(
cmd, timeout, properties, enqueued_time, repair, yes
(enqueued_time, properties, timeout, output, message_count) = init_monitoring(
cmd, timeout, properties, enqueued_time, repair, yes, message_count
)

device_ids = {}
Expand Down Expand Up @@ -2537,6 +2541,7 @@ def _iot_hub_monitor_events(
device_id=device_id,
interface_name=interface_name,
module_id=module_id,
max_messages=message_count,
)

handler = CommonHandler(handler_args)
Expand Down
19 changes: 19 additions & 0 deletions azext_iot/tests/iothub/core/test_iot_messaging_int.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,16 @@ def test_hub_monitor_events(self):
],
)

# Monitor events for all devices, limiting monitor message count to 10
num_messages = 10
monitor_stop_msgs = ["Successfully parsed {} message(s).".format(num_messages), "Stopping event monitor..."]
self.command_execute_assert(
"iot hub monitor-events -n {} -g {} --cg {} --et {} --message-count {} -y -p sys anno app".format(
self.entity_name, self.entity_rg, LIVE_CONSUMER_GROUPS[0], enqueued_time, num_messages
),
device_ids + monitor_stop_msgs,
)

# Monitor events for a single device
self.command_execute_assert(
"iot hub monitor-events -n {} -g {} -d {} --cg {} --et {} -t 8 -y -p all".format(
Expand Down Expand Up @@ -886,6 +896,15 @@ def test_hub_monitor_events(self):
device_subset_exclude,
)

# Expect failure when message count is negative
with pytest.raises(Exception):
self.command_execute_assert(
"iot hub monitor-events -n {} -g {} -d {} --et {} -t 8 -y -p sys anno app --mc -5".format(
self.entity_name, self.entity_rg, PREFIX_DEVICE + "*", enqueued_time
),
device_ids,
)

# Monitor events with --login parameter
self.command_execute_assert(
"iot hub monitor-events -t 8 -y -p all --cg {} --et {} --login {}".format(
Expand Down

0 comments on commit f1ddeab

Please sign in to comment.