#!/usr/bin/env python3 import copy import datetime import json import logging import numpy import sys import time from azure.eventhub import EventHubConsumerClient from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore EVENT_HUB_MONITOR_CHECKPOINTS = "" EVENT_HUB_MONITOR_CONSUMER_GROUP = "" EVENT_HUB_MONITOR_CONNECTION_STRING = "" EVENT_HUB_MONITOR_STATS_EVERY_SECS = 300 def printStats(): logging.warning("Number of events read per hour:") for hour_epoch in status["events"]: logging.warning("\t%s: %u events." % (datetime.datetime.fromtimestamp(hour_epoch).strftime("%Y-%m-%d %H:00"), status["events"][hour_epoch])) if len(status["errors"]) > 0: logging.warning("Number of errors per hour:") for hour_epoch in status["errors"]: logging.warning("\t%s: %u errors." % (datetime.datetime.fromtimestamp(hour_epoch).strftime("%Y-%m-%d %H:00"), status["errors"][hour_epoch])) logging.warning("Number of partition assignments per hour:") for hour_epoch in status["assignments"]: logging.warning("\t%s: %u assignments." % (datetime.datetime.fromtimestamp(hour_epoch).strftime("%Y-%m-%d %H:00"), status["assignments"][hour_epoch])) if len(status["closings"]) > 0: logging.warning("Number of partition closings per hour:") for hour_epoch in status["closings"]: logging.warning("\t%s: %u closings." % (datetime.datetime.fromtimestamp(hour_epoch).strftime("%Y-%m-%d %H:00"), status["closings"][hour_epoch])) logging.warning("Partition assignment:") for epoch, assignment in status["assignedPartitions"]: logging.warning("\t%s: %s" % (datetime.datetime.fromtimestamp(epoch).strftime("%Y-%m-%d %H:%M:%S"), sorted(assignment))) def onEvent(partition_context, event_data): hour_epoch = int(time.time()) // 3600 * 3600 if hour_epoch not in status["events"]: status["events"][hour_epoch] = 0 status["events"][hour_epoch] += 1 if numpy.random.rand() < 0.0001: # From time to time, print the progress of reading the data topics: sequence_max = partition_context.last_enqueued_event_properties["sequence_number"] sequence_this = event_data.sequence_number logging.warning("Reading event %u out of %u in partition %s (lag = %u)" % (sequence_this, sequence_max, partition_context.partition_id, sequence_max - sequence_this)) if time.time() - status["lastStatusPrint"] >= EVENT_HUB_MONITOR_STATS_EVERY_SECS: # From time to time, print the status of the monitor: printStats() status["lastStatusPrint"] = time.time() def onError(partition_context, exception): logging.error("Error: %s, %s" % (partition_context, exception)) hour_epoch = int(time.time()) // 3600 * 3600 if hour_epoch not in status["errors"]: status["errors"][hour_epoch] = 0 status["errors"][hour_epoch] += 1 def onPartitionInitialize(partition_context): if len(status["assignedPartitions"]) > 0: current_assignment = copy.deepcopy(status["assignedPartitions"][-1][1]) else: current_assignment = [] current_assignment.append(int(partition_context.partition_id)) status["assignedPartitions"].append((time.time(), current_assignment)) hour_epoch = int(time.time()) // 3600 * 3600 if hour_epoch not in status["assignments"]: status["assignments"][hour_epoch] = 0 status["assignments"][hour_epoch] += 1 logging.warning("Assigned partition %u." % int(partition_context.partition_id)) def onPartitionClose(partition_context, close_reason): current_assignment = copy.deepcopy(status["assignedPartitions"][-1][1]) current_assignment.remove(int(partition_context.partition_id)) status["assignedPartitions"].append((time.time(), current_assignment)) hour_epoch = int(time.time()) // 3600 * 3600 if hour_epoch not in status["closings"]: status["closings"][hour_epoch] = 0 status["closings"][hour_epoch] += 1 logging.warning("Closed partition %u with reason: %s." % (int(partition_context.partition_id), close_reason)) if __name__ == "__main__": # Parse script arguments: logging.basicConfig(format = "%(levelname)s @ %(asctime)s: %(message)s", stream = sys.stdout, level = logging.WARNING) status = { "lastStatusPrint": time.time(), "events": {}, "errors": {}, "assignments": {}, "closings": {}, "assignedPartitions": [], } client = EventHubConsumerClient.from_connection_string( conn_str = EVENT_HUB_MONITOR_CONNECTION_STRING, consumer_group = EVENT_HUB_MONITOR_CONSUMER_GROUP, checkpoint_store = BlobCheckpointStore.from_connection_string(EVENT_HUB_MONITOR_CHECKPOINTS, EVENT_HUB_MONITOR_CONSUMER_GROUP), auth_timeout = 0, ) with client: client.receive( on_event = onEvent, on_error = onError, on_partition_initialize = onPartitionInitialize, on_partition_close = onPartitionClose, starting_position = "-1", track_last_enqueued_event_properties = True, )