From 02a5c31683da77aa35df3a7e820547298c2ddeb3 Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri Date: Fri, 9 Apr 2021 19:56:12 -0400 Subject: [PATCH 1/2] updated migration guide --- .../azure-eventhub/migration_guide.md | 83 +++++++++++++++++-- 1 file changed, 76 insertions(+), 7 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/migration_guide.md b/sdk/eventhub/azure-eventhub/migration_guide.md index 6825179943e6..d84d375c97a7 100644 --- a/sdk/eventhub/azure-eventhub/migration_guide.md +++ b/sdk/eventhub/azure-eventhub/migration_guide.md @@ -173,13 +173,6 @@ your program when receiving events. In V5, `EventHubConsumerClient` allows you to do the same with the `receive()` method if you pass a `CheckpointStore` to the constructor. -> **Note:** V1 checkpoints are not compatible with V5 checkpoints. -If pointed at the same blob, consumption will begin at the first message. -V1 checkpoint json in the respective blobs can be manually converted (per-partition) if needed. -In V1 checkpoints (sequence_number and offset) are stored in the format of json along with ownership information -as the content of the blob, while in V5, checkpoints are kept in the metadata of a blob and the metadata is composed of name-value pairs. -Please check [update_checkpoint](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhub-checkpointstoreblob/azure/eventhub/extensions/checkpointstoreblob/_blobstoragecs.py#L231-L250) in V5 for implementation detail. - So in v1: ```python import logging @@ -286,6 +279,82 @@ if __name__ == '__main__': loop.run_until_complete(main()) ``` +> **Note:** V1 checkpoints are not compatible with V5 checkpoints. +If pointed at the same blob, consumption will begin at the first message. +V1 checkpoint json in the respective blobs can be manually converted (per-partition) if needed. +In V1 checkpoints (sequence_number and offset) are stored in the format of json along with ownership information +as the content of the blob, while in V5, checkpoints are kept in the metadata of a blob and the metadata is composed of name-value pairs. +Please check [update_checkpoint](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhub-checkpointstoreblob/azure/eventhub/extensions/checkpointstoreblob/_blobstoragecs.py#L231-L250) in V5 for implementation detail. + +The following code snippet can be used to migrate checkpoint data from the legacy format. This snippet assumes that the default prefix configuration for the `EventProcessorHost` was used. If a custom prefix was configured, this code will need to be adjusted to account for the difference in format. +```python +import os +import json +from azure.storage.blob import BlobServiceClient, ContainerClient + +EVENT_HUB_HOSTNAME = os.environ["EVENT_HUB_HOSTNAME"] # .servicebus.windows.net +EVENT_HUB_NAME = os.environ["EVENT_HUB_NAME"] +EVENT_HUB_CONSUMER_GROUP = "$Default" # Name of Event Hub consumer group + +STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"] +BLOB_CONTAINER_NAME = "your-blob-container-name" # Blob container to upload updated checkpoint information to. +LEGACY_BLOB_CONTAINER_NAME = "your-legacy-blob-container-name" # Please make sure the legacy blob container resource exists. + + +def readLegacyCheckpoints( + storage_connection_str, legacy_blob_container_name, consumer_group +): + container_client = ContainerClient.from_connection_string( + storage_connection_str, legacy_blob_container_name + ) + legacy_checkpoints = [] + + # Read and process legacy checkpoints in blobs in container. + for blob in container_client.list_blobs(): + blob_client = container_client.get_blob_client(blob) + stream = blob_client.download_blob() + for chunk in stream.chunks(): + legacy_checkpoints.append(json.loads(chunk.decode("UTF-8"))) + return legacy_checkpoints + + +if __name__ == "__main__": + legacy_checkpoints = readLegacyCheckpoints( + STORAGE_CONNECTION_STR, LEGACY_BLOB_CONTAINER_NAME, EVENT_HUB_CONSUMER_GROUP + ) + + # The checkpoint blobs require a specific naming scheme to be valid for use with the + # V5 CheckpointStore. + prefix = "{}/{}/{}/checkpoint/".format( + EVENT_HUB_FULLY_QUALIFIED_NAMESPACE.lower(), + EVENT_HUB_NAME.lower(), + EVENT_HUB_CONSUMER_GROUP.lower(), + ) + + # Create the storage client to write the migrated checkpoints. This example + # assumes that the connection string grants the appropriate permissions to create a + # container in the storage account. + blob_service_client = BlobServiceClient.from_connection_string( + STORAGE_CONNECTION_STR + ) + container_client = blob_service_client.get_container_client(BLOB_CONTAINER_NAME) + try: + # Create container if it doesn't already exist. + container_client.create_container() + except: + pass + + # Translate each legacy checkpoint, storing offset and sequence data into correct + # blob to align with V5 BlobCheckpointStore. + for checkpoint in legacy_checkpoints: + metadata = { + "offset": str(checkpoint["offset"]), + "sequence_number": str(checkpoint["sequence_number"]), + } + name = "{}{}".format(prefix, checkpoint["partition_id"]) + container_client.upload_blob(name, data="", metadata=metadata) +``` + ## Additional samples More examples can be found at [Samples for azure-eventhub](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhub/samples) \ No newline at end of file From 696249a5170b427b0e07f7cf502cc7ea4c9fd94f Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri Date: Thu, 15 Apr 2021 12:31:44 -0700 Subject: [PATCH 2/2] small fix --- sdk/eventhub/azure-eventhub/migration_guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhub/migration_guide.md b/sdk/eventhub/azure-eventhub/migration_guide.md index d84d375c97a7..a6cfb1113d2e 100644 --- a/sdk/eventhub/azure-eventhub/migration_guide.md +++ b/sdk/eventhub/azure-eventhub/migration_guide.md @@ -349,7 +349,7 @@ if __name__ == "__main__": for checkpoint in legacy_checkpoints: metadata = { "offset": str(checkpoint["offset"]), - "sequence_number": str(checkpoint["sequence_number"]), + "sequencenumber": str(checkpoint["sequence_number"]), } name = "{}{}".format(prefix, checkpoint["partition_id"]) container_client.upload_blob(name, data="", metadata=metadata)