Skip to content

Commit

Permalink
Adapt to azure core's cloud event (#17063)
Browse files Browse the repository at this point in the history
* Adapt to cloud event's azure core

* fix tests

* fix

* shared reqs

* raw request hook

* lint
  • Loading branch information
Rakshith Bhyravabhotla authored Mar 3, 2021
1 parent d18ddd9 commit 1a105e3
Show file tree
Hide file tree
Showing 21 changed files with 106 additions and 194 deletions.
1 change: 1 addition & 0 deletions sdk/eventgrid/azure-eventgrid/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 2.0.0b6 (Unreleased)

**Breaking Changes**
- `~azure.eventgrid.CloudEvent` is now removed in favor of `~azure.core.messaging.CloudEvent`.
- All the `SystemEventNames` related to Azure Communication Service starting with `ACS****` are renamed to `Acs***` to honor pascal case.

**Features**
Expand Down
8 changes: 5 additions & 3 deletions sdk/eventgrid/azure-eventgrid/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ This example publishes a Cloud event.
```Python
import os
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid import EventGridPublisherClient, CloudEvent
from azure.core.messaging import CloudEvent
from azure.eventgrid import EventGridPublisherClient

key = os.environ["CLOUD_ACCESS_KEY"]
endpoint = os.environ["CLOUD_TOPIC_HOSTNAME"]
Expand All @@ -166,7 +167,7 @@ client.send(event)
This example consumes a message received from storage queue and deserializes it to a CloudEvent object.

```Python
from azure.eventgrid import CloudEvent
from azure.core.messaging import CloudEvent
from azure.storage.queue import QueueServiceClient, BinaryBase64DecodePolicy
import os
import json
Expand Down Expand Up @@ -244,7 +245,8 @@ Once the `tracer` and `exporter` are set, please follow the example below to sta

```python
import os
from azure.eventgrid import EventGridPublisherClient, CloudEvent
from azure.eventgrid import EventGridPublisherClient
from azure.core.messaging import CloudEvent
from azure.core.credentials import AzureKeyCredential

hostname = os.environ['CLOUD_TOPIC_HOSTNAME']
Expand Down
3 changes: 1 addition & 2 deletions sdk/eventgrid/azure-eventgrid/azure/eventgrid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
from ._publisher_client import EventGridPublisherClient
from ._event_mappings import SystemEventNames
from ._helpers import generate_sas
from ._models import CloudEvent, EventGridEvent
from ._models import EventGridEvent
from ._version import VERSION

__all__ = [
"EventGridPublisherClient",
"CloudEvent",
"EventGridEvent",
"generate_sas",
"SystemEventNames",
Expand Down
26 changes: 26 additions & 0 deletions sdk/eventgrid/azure-eventgrid/azure/eventgrid/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
from ._signature_credential_policy import EventGridSasCredentialPolicy
from . import _constants as constants

from ._generated.models import (
CloudEvent as InternalCloudEvent,
)

if TYPE_CHECKING:
from datetime import datetime

Expand Down Expand Up @@ -134,3 +138,25 @@ def _eventgrid_data_typecheck(event):
"Data in EventGridEvent cannot be bytes. Please refer to"
"https://docs.microsoft.com/en-us/azure/event-grid/event-schema"
)

def _cloud_event_to_generated(cloud_event, **kwargs):
if isinstance(cloud_event.data, six.binary_type):
data_base64 = cloud_event.data
data = None
else:
data = cloud_event.data
data_base64 = None
return InternalCloudEvent(
id=cloud_event.id,
source=cloud_event.source,
type=cloud_event.type,
specversion=cloud_event.specversion,
data=data,
data_base64=data_base64,
time=cloud_event.time,
dataschema=cloud_event.dataschema,
datacontenttype=cloud_event.datacontenttype,
subject=cloud_event.subject,
additional_properties=cloud_event.extensions,
**kwargs
)
148 changes: 2 additions & 146 deletions sdk/eventgrid/azure-eventgrid/azure/eventgrid/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,160 +3,16 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
# pylint:disable=protected-access
from typing import Union, Any, Dict
from typing import Any
import datetime as dt
import uuid
import json
import six
from msrest.serialization import UTC
from ._generated.models import (
EventGridEvent as InternalEventGridEvent,
CloudEvent as InternalCloudEvent,
)


class EventMixin(object):
"""
Mixin for the event models comprising of some helper methods.
"""

@staticmethod
def _from_json(event, encode):
"""
Load the event into the json
:param dict eventgrid_event: The event to be deserialized.
:type eventgrid_event: Union[str, dict, bytes]
:param str encode: The encoding to be used. Defaults to 'utf-8'
"""
if isinstance(event, six.binary_type):
event = json.loads(event.decode(encode))
elif isinstance(event, six.string_types):
event = json.loads(event)
return event


class CloudEvent(EventMixin): # pylint:disable=too-many-instance-attributes
"""Properties of an event published to an Event Grid topic using the CloudEvent 1.0 Schema.
All required parameters must be populated in order to send to Azure.
If data is of binary type, data_base64 can be used alternatively. Note that data and data_base64
cannot be present at the same time.
:param source: Required. Identifies the context in which an event happened. The combination of id and source must
be unique for each distinct event. If publishing to a domain topic, source must be the domain name.
:type source: str
:param type: Required. Type of event related to the originating occurrence.
:type type: str
:keyword data: Optional. Event data specific to the event type. Only one of the `data` or `data_base64`
argument must be present. If data is of bytes type, it will be sent as data_base64 in the outgoing request.
:type data: object
:keyword time: Optional. The time (in UTC) the event was generated, in RFC3339 format.
:type time: ~datetime.datetime
:keyword dataschema: Optional. Identifies the schema that data adheres to.
:type dataschema: str
:keyword datacontenttype: Optional. Content type of data value.
:type datacontenttype: str
:keyword subject: Optional. This describes the subject of the event in the context of the event producer
(identified by source).
:type subject: str
:keyword specversion: Optional. The version of the CloudEvent spec. Defaults to "1.0"
:type specversion: str
:keyword id: Optional. An identifier for the event. The combination of id and source must be
unique for each distinct event. If not provided, a random UUID will be generated and used.
:type id: Optional[str]
:keyword data_base64: Optional. Event data specific to the event type if the data is of bytes type.
Only data of bytes type is accepted by `data-base64` and only one of the `data` or `data_base64` argument
must be present.
:type data_base64: bytes
:ivar source: Identifies the context in which an event happened. The combination of id and source must
be unique for each distinct event. If publishing to a domain topic, source must be the domain name.
:vartype source: str
:ivar data: Event data specific to the event type.
:vartype data: object
:ivar data_base64: Event data specific to the event type if the data is of bytes type.
:vartype data_base64: bytes
:ivar type: Type of event related to the originating occurrence.
:vartype type: str
:ivar time: The time (in UTC) the event was generated, in RFC3339 format.
:vartype time: ~datetime.datetime
:ivar dataschema: Identifies the schema that data adheres to.
:vartype dataschema: str
:ivar datacontenttype: Content type of data value.
:vartype datacontenttype: str
:ivar subject: This describes the subject of the event in the context of the event producer
(identified by source).
:vartype subject: str
:ivar specversion: Optional. The version of the CloudEvent spec. Defaults to "1.0"
:vartype specversion: str
:ivar id: An identifier for the event. The combination of id and source must be
unique for each distinct event. If not provided, a random UUID will be generated and used.
:vartype id: Optional[str]
"""

def __init__(self, source, type, **kwargs): # pylint: disable=redefined-builtin
# type: (str, str, Any) -> None
self.source = source
self.type = type
self.specversion = kwargs.pop("specversion", "1.0")
self.id = kwargs.pop("id", str(uuid.uuid4()))
self.time = kwargs.pop("time", dt.datetime.now(UTC()).isoformat())
self.data = kwargs.pop("data", None)
self.datacontenttype = kwargs.pop("datacontenttype", None)
self.dataschema = kwargs.pop("dataschema", None)
self.subject = kwargs.pop("subject", None)
self.data_base64 = kwargs.pop("data_base64", None)
self.extensions = {}
self.extensions.update(dict(kwargs.pop("extensions", {})))
if self.data is not None and self.data_base64 is not None:
raise ValueError(
"data and data_base64 cannot be provided at the same time.\
Use data_base64 only if you are sending bytes, and use data otherwise."
)

@classmethod
def _from_generated(cls, cloud_event, **kwargs):
# type: (Union[str, Dict, bytes], Any) -> CloudEvent
generated = InternalCloudEvent.deserialize(cloud_event)
if generated.additional_properties:
extensions = dict(generated.additional_properties)
kwargs.setdefault("extensions", extensions)
return cls(
id=generated.id,
source=generated.source,
type=generated.type,
specversion=generated.specversion,
data=generated.data or generated.data_base64,
time=generated.time,
dataschema=generated.dataschema,
datacontenttype=generated.datacontenttype,
subject=generated.subject,
**kwargs
)

def _to_generated(self, **kwargs):
if isinstance(self.data, six.binary_type):
data_base64 = self.data
data = None
else:
data = self.data
data_base64 = None
return InternalCloudEvent(
id=self.id,
source=self.source,
type=self.type,
specversion=self.specversion,
data=data,
data_base64=self.data_base64 or data_base64,
time=self.time,
dataschema=self.dataschema,
datacontenttype=self.datacontenttype,
subject=self.subject,
additional_properties=self.extensions,
**kwargs
)


class EventGridEvent(InternalEventGridEvent, EventMixin):
class EventGridEvent(InternalEventGridEvent):
"""Properties of an event published to an Event Grid topic using the EventGrid Schema.
Variables are only populated by the server, and will be ignored when sending a request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@
HttpLoggingPolicy,
UserAgentPolicy,
)
from azure.core.messaging import CloudEvent

from ._models import CloudEvent, EventGridEvent
from ._models import EventGridEvent
from ._helpers import (
_get_endpoint_only_fqdn,
_get_authentication_policy,
_is_cloud_event,
_is_eventgrid_event,
_eventgrid_data_typecheck,
_cloud_event_to_generated,
)
from ._generated._event_grid_publisher_client import (
EventGridPublisherClient as EventGridPublisherClientImpl,
Expand Down Expand Up @@ -179,7 +181,7 @@ def send(self, events, **kwargs):
if isinstance(events[0], CloudEvent) or _is_cloud_event(events[0]):
try:
events = [
cast(CloudEvent, e)._to_generated(**kwargs) for e in events # pylint: disable=protected-access
_cloud_event_to_generated(e, **kwargs) for e in events # pylint: disable=protected-access
]
except AttributeError:
pass # means it's a dictionary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import Any, Union, List, Dict, cast
from azure.core.credentials import AzureKeyCredential, AzureSasCredential
from azure.core.tracing.decorator_async import distributed_trace_async
from azure.core.messaging import CloudEvent
from azure.core.pipeline.policies import (
RequestIdPolicy,
HeadersPolicy,
Expand All @@ -23,13 +24,14 @@
UserAgentPolicy,
)
from .._policies import CloudEventDistributedTracingPolicy
from .._models import CloudEvent, EventGridEvent
from .._models import EventGridEvent
from .._helpers import (
_get_endpoint_only_fqdn,
_get_authentication_policy,
_is_cloud_event,
_is_eventgrid_event,
_eventgrid_data_typecheck,
_cloud_event_to_generated,
)
from .._generated.aio import EventGridPublisherClient as EventGridPublisherClientAsync
from .._version import VERSION
Expand Down Expand Up @@ -172,7 +174,7 @@ async def send(self, events: SendType, **kwargs: Any) -> None:
if isinstance(events[0], CloudEvent) or _is_cloud_event(events[0]):
try:
events = [
cast(CloudEvent, e)._to_generated(**kwargs) for e in events # pylint: disable=protected-access
_cloud_event_to_generated(e, **kwargs) for e in events # pylint: disable=protected-access
]
except AttributeError:
pass # means it's a dictionary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"""
import os
import asyncio
from azure.eventgrid import CloudEvent
from azure.core.messaging import CloudEvent
from azure.eventgrid.aio import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# [START publish_cloud_event_to_topic_async]
import os
import asyncio
from azure.eventgrid import CloudEvent
from azure.core.messaging import CloudEvent
from azure.eventgrid.aio import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
3) STORAGE_QUEUE_NAME: The name of the storage queue.
"""

from azure.eventgrid import CloudEvent
from azure.core.messaging import CloudEvent
from azure.storage.queue import QueueServiceClient, BinaryBase64DecodePolicy
import os
import json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import time

from azure.core.credentials import AzureKeyCredential
from azure.eventgrid import EventGridPublisherClient, CloudEvent
from azure.core.messaging import CloudEvent
from azure.eventgrid import EventGridPublisherClient

key = os.environ.get("CLOUD_ACCESS_KEY")
endpoint = os.environ["CLOUD_TOPIC_HOSTNAME"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import time

from azure.core.credentials import AzureKeyCredential
from azure.eventgrid import EventGridPublisherClient, CloudEvent
from azure.core.messaging import CloudEvent
from azure.eventgrid import EventGridPublisherClient

domain_key = os.environ["DOMAIN_ACCESS_KEY"]
domain_endpoint = os.environ["DOMAIN_TOPIC_HOSTNAME"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@

from datetime import datetime, timedelta
from azure.core.credentials import AzureSasCredential
from azure.eventgrid import EventGridPublisherClient, CloudEvent, generate_sas
from azure.core.messaging import CloudEvent
from azure.eventgrid import EventGridPublisherClient, generate_sas

key = os.environ["CLOUD_ACCESS_KEY"]
endpoint = os.environ["CLOUD_TOPIC_HOSTNAME"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
python sample_consume_custom_payload.py
"""

from azure.eventgrid import CloudEvent
from azure.core.messaging import CloudEvent
import json

# all types of CloudEvents below produce same DeserializedEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
"""
# [START publish_cloud_event_to_topic]
import os
from azure.eventgrid import EventGridPublisherClient, CloudEvent
from azure.eventgrid import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent

topic_key = os.environ["CLOUD_ACCESS_KEY"]
endpoint = os.environ["CLOUD_TOPIC_HOSTNAME"]
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventgrid/azure-eventgrid/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
]),
install_requires=[
'msrest>=0.6.19',
'azure-core<2.0.0,>=1.10.0',
'azure-core<2.0.0,>=1.12.0',
],
extras_require={
":python_version<'3.0'": ['azure-nspkg'],
Expand Down
Loading

0 comments on commit 1a105e3

Please sign in to comment.