Skip to content

Commit

Permalink
[EventHub] update arm template with storage conn str (#20376)
Browse files Browse the repository at this point in the history
* update test resources

* fix

* fix failing tests

* adams comments

* pylint

* remove dateutil

* adams comments

* nit
  • Loading branch information
swathipil authored Sep 14, 2021
1 parent 279e7eb commit 3fdbaa7
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import time
import logging
import calendar
import dateutil.parser
from azure.core import MatchConditions
from azure.eventhub import CheckpointStore # type: ignore # pylint: disable=no-name-in-module
from azure.eventhub.exceptions import OwnershipLostError # type: ignore
Expand All @@ -18,6 +17,8 @@
)
from ._vendor.data.tables import TableClient, UpdateMode
from ._vendor.data.tables._base_client import parse_connection_str
from ._vendor.data.tables._deserialize import clean_up_dotnet_timestamps
from ._vendor.data.tables._common_conversion import TZ_UTC

logger = logging.getLogger(__name__)

Expand All @@ -39,6 +40,19 @@ def _to_timestamp(date):
timestamp += date.microsecond / 1e6
return timestamp

def _timestamp_to_datetime(value):
# Cosmos returns this with a decimal point that throws an error on deserialization
cleaned_value = clean_up_dotnet_timestamps(value)
try:
dt_obj = datetime.datetime.strptime(cleaned_value, "%Y-%m-%dT%H:%M:%S.%fZ").replace(
tzinfo=TZ_UTC
)
except ValueError:
dt_obj = datetime.datetime.strptime(cleaned_value, "%Y-%m-%dT%H:%M:%SZ").replace(
tzinfo=TZ_UTC
)
return dt_obj


class TableCheckpointStore(CheckpointStore):
"""A CheckpointStore that uses Azure Table Storage to store the partition ownership and checkpoint data.
Expand Down Expand Up @@ -113,13 +127,13 @@ def _create_ownership_entity(cls, ownership):
Create a dictionary with the `ownership` attributes.
"""
ownership_entity = {
"PartitionKey": "{} {} {} Ownership".format(
"PartitionKey": u"{} {} {} Ownership".format(
ownership["fully_qualified_namespace"],
ownership["eventhub_name"],
ownership["consumer_group"],
),
"RowKey": ownership["partition_id"],
"ownerid": ownership["owner_id"],
"RowKey": u"{}".format(ownership["partition_id"]),
"ownerid": u"{}".format(ownership["owner_id"]),
}
return ownership_entity

Expand All @@ -129,21 +143,21 @@ def _create_checkpoint_entity(cls, checkpoint):
Create a dictionary with `checkpoint` attributes.
"""
checkpoint_entity = {
"PartitionKey": "{} {} {} Checkpoint".format(
"PartitionKey": u"{} {} {} Checkpoint".format(
checkpoint["fully_qualified_namespace"],
checkpoint["eventhub_name"],
checkpoint["consumer_group"],
),
"RowKey": checkpoint["partition_id"],
"offset": checkpoint["offset"],
"sequencenumber": checkpoint["sequence_number"],
"RowKey": u"{}".format(checkpoint["partition_id"]),
"offset": u"{}".format(checkpoint["offset"]),
"sequencenumber": u"{}".format(checkpoint["sequence_number"]),
}
return checkpoint_entity

def _update_ownership(self, ownership, **kwargs):
"""_update_ownership mutates the passed in ownership."""
ownership_entity = TableCheckpointStore._create_ownership_entity(ownership)
try:
ownership_entity = TableCheckpointStore._create_ownership_entity(ownership)
metadata = self._table_client.update_entity(
mode=UpdateMode.REPLACE,
entity=ownership_entity,
Expand All @@ -166,7 +180,7 @@ def _update_ownership(self, ownership, **kwargs):
)
ownership["etag"] = metadata["etag"]
ownership["last_modified_time"] = _to_timestamp(
dateutil.parser.isoparse(metadata["content"]["Timestamp"])
_timestamp_to_datetime(metadata["content"]["Timestamp"])
)

def _claim_one_partition(self, ownership, **kwargs):
Expand Down Expand Up @@ -289,7 +303,7 @@ def list_checkpoints(
"eventhub_name": eventhub_name,
"consumer_group": consumer_group,
"partition_id": entity[u"RowKey"],
"sequence_number": entity[u"sequencenumber"],
"sequence_number": int(entity[u"sequencenumber"]),
"offset": str(entity[u"offset"]),
}
checkpoints_list.append(checkpoint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
from azure.eventhub.extensions.checkpointstoretable import TableCheckpointStore
from azure.eventhub.exceptions import OwnershipLostError

STORAGE_CONN_STR = [
#os.environ.get("AZURE_STORAGE_CONN_STR", "Azure Storage Connection String"),
os.environ.get("AZURE_COSMOS_CONN_STR", "Azure Storage Connection String"),
STORAGE_ENV_KEYS = [
"AZURE_TABLES_CONN_STR",
"AZURE_COSMOS_CONN_STR"
]


def get_live_storage_table_client(storage_connection_str):
def get_live_storage_table_client(conn_str_env_key):
try:
storage_connection_str = os.environ[conn_str_env_key]
table_name = "table{}".format(uuid.uuid4().hex)
table_service_client = TableServiceClient.from_connection_string(
storage_connection_str
Expand Down Expand Up @@ -176,35 +177,35 @@ def _update_and_list_checkpoint(storage_connection_str, table_name):
assert checkpoint_list[0]["offset"] == "30"


@pytest.mark.parametrize("storage_connection_str", STORAGE_CONN_STR)
@pytest.mark.skip("update after adding conn str env var")
def test_claim_ownership_exception(storage_connection_str):
@pytest.mark.parametrize("conn_str_env_key", STORAGE_ENV_KEYS)
@pytest.mark.liveTest
def test_claim_ownership_exception(conn_str_env_key):
storage_connection_str, table_name = get_live_storage_table_client(
storage_connection_str
conn_str_env_key
)
try:
_claim_ownership_exception_test(storage_connection_str, table_name)
finally:
remove_live_storage_table_client(storage_connection_str, table_name)


@pytest.mark.parametrize("storage_connection_str", STORAGE_CONN_STR)
@pytest.mark.skip("update after adding conn str env var")
def test_claim_and_list_ownership(storage_connection_str):
@pytest.mark.parametrize("conn_str_env_key", STORAGE_ENV_KEYS)
@pytest.mark.liveTest
def test_claim_and_list_ownership(conn_str_env_key):
storage_connection_str, table_name = get_live_storage_table_client(
storage_connection_str
conn_str_env_key
)
try:
_claim_and_list_ownership(storage_connection_str, table_name)
finally:
remove_live_storage_table_client(storage_connection_str, table_name)


@pytest.mark.parametrize("storage_connection_str", STORAGE_CONN_STR)
@pytest.mark.skip("update after adding conn str env var")
def test_update_checkpoint(storage_connection_str):
@pytest.mark.parametrize("conn_str_env_key", STORAGE_ENV_KEYS)
@pytest.mark.liveTest
def test_update_checkpoint(conn_str_env_key):
storage_connection_str, table_name = get_live_storage_table_client(
storage_connection_str
conn_str_env_key
)
try:
_update_and_list_checkpoint(storage_connection_str, table_name)
Expand Down
64 changes: 63 additions & 1 deletion sdk/eventhub/test-resources.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,14 @@
"eventHubsNamespace": "[concat('eh-', parameters('baseName'))]",
"eventHubName": "[concat('eh-', parameters('baseName'), '-hub')]",
"eventHubAuthRuleName": "[concat('eh-', parameters('baseName'), '-hub-auth-rule')]",
"storageAccount": "[concat('blb', parameters('baseName'))]",
"storageAccount": "[concat('storage', parameters('baseName'))]",
"containerName": "your-blob-container-name",
"defaultSASKeyName": "RootManageSharedAccessKey",
"eventHubsAuthRuleResourceId": "[resourceId('Microsoft.EventHub/namespaces/authorizationRules', variables('eventHubsNamespace'), variables('defaultSASKeyName'))]",
"storageAccountId": "[resourceId('Microsoft.Storage/storageAccounts', variables('storageAccount'))]",
"tablesMgmtApiVersion": "2019-04-01",
"tablesAuthorizationApiVersion": "2018-09-01-preview",
"tableDataContributorRoleId": "0a9a7e1f-b9d0-4cc4-a60d-0319b160aaa3"
},
"resources": [
{
Expand Down Expand Up @@ -140,6 +143,48 @@
}
]
},
{
"type": "Microsoft.DocumentDB/databaseAccounts",
"apiVersion": "2020-04-01",
"name": "[variables('storageAccount')]",
"location": "[parameters('location')]",
"tags": {
"defaultExperience": "Azure Table",
"hidden-cosmos-mmspecial": "",
"CosmosAccountType": "Non-Production"
},
"kind": "GlobalDocumentDB",
"properties": {
"publicNetworkAccess": "Enabled",
"enableAutomaticFailover": false,
"enableMultipleWriteLocations": false,
"isVirtualNetworkFilterEnabled": false,
"virtualNetworkRules": [],
"disableKeyBasedMetadataWriteAccess": false,
"enableFreeTier": false,
"enableAnalyticalStorage": false,
"databaseAccountOfferType": "Standard",
"consistencyPolicy": {
"defaultConsistencyLevel": "BoundedStaleness",
"maxIntervalInSeconds": 86400,
"maxStalenessPrefix": 1000000
},
"locations": [
{
"locationName": "[parameters('location')]",
"provisioningState": "Succeeded",
"failoverPriority": 0,
"isZoneRedundant": false
}
],
"capabilities": [
{
"name": "EnableTable"
}
],
"ipRules": []
}
},
{
"type": "Microsoft.Authorization/roleAssignments",
"apiVersion": "2019-04-01-preview",
Expand All @@ -159,6 +204,15 @@
"principalId": "[parameters('testApplicationOid')]",
"scope": "[resourceGroup().id]"
}
},
{
"type": "Microsoft.Authorization/roleAssignments",
"apiVersion": "[variables('tablesAuthorizationApiVersion')]",
"name": "[guid(concat('tableDataContributorRoleId', resourceGroup().id))]",
"properties": {
"roleDefinitionId": "[resourceId('Microsoft.Authorization/roleDefinitions', variables('tableDataContributorRoleId'))]",
"principalId": "[parameters('testApplicationOid')]"
}
}
],
"outputs": {
Expand Down Expand Up @@ -197,6 +251,14 @@
"AZURE_STORAGE_ACCESS_KEY":{
"type": "string",
"value": "[listKeys(variables('storageAccountId'), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value]"
},
"AZURE_TABLES_CONN_STR": {
"type": "string",
"value": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('storageAccount'), ';AccountKey=', listKeys(variables('storageAccountId'), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value, ';EndpointSuffix=', parameters('storageEndpointSuffix'))]"
},
"AZURE_COSMOS_CONN_STR": {
"type": "string",
"value": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('storageAccount'), ';AccountKey=', listKeys(resourceId('Microsoft.DocumentDB/databaseAccounts', variables('storageAccount')), '2020-04-01').primaryMasterKey, ';TableEndpoint=https://', variables('storageAccount'), '.table.cosmos.azure.com:443/')]"
}
}
}

0 comments on commit 3fdbaa7

Please sign in to comment.