Skip to content

Commit

Permalink
[ServiceBus] Topic Sender Implementation (Azure#10748)
Browse files Browse the repository at this point in the history
* topic sender implementation

* update changelog

* minor update on sample and samples/readme
  • Loading branch information
yunhaoling authored and iscai-msft committed Apr 13, 2020
1 parent e49d143 commit ce504cb
Show file tree
Hide file tree
Showing 13 changed files with 350 additions and 7 deletions.
4 changes: 4 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## 7.0.0b2 (Unreleased)

**New Features**

* Added method `get_topic_sender` in `ServiceBusClient` to get a `ServiceBusSender` for a topic.

**BugFixes**

* Fig bug where http_proxy and transport_type in ServiceBusClient are not propagated into Sender/Receiver creation properly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,38 @@ def get_queue_receiver(self, queue_name, **kwargs):
)

return receiver

def get_topic_sender(self, topic_name, **kwargs):
# type: (str, Any) -> ServiceBusSender
"""Get ServiceBusSender for the specific topic.
:param str topic_name: The path of specific Service Bus Topic the client connects to.
:keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs.
Default value is 3.
:keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries.
Default value is 0.8.
:keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120.
:rtype: ~azure.servicebus.ServiceBusSender
.. admonition:: Example:
.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
:start-after: [START create_topic_sender_from_sb_client_sync]
:end-before: [END create_topic_sender_from_sb_client_sync]
:language: python
:dedent: 4
:caption: Create a new instance of the ServiceBusSender from ServiceBusClient.
"""
sender = ServiceBusSender(
fully_qualified_namespace=self.fully_qualified_namespace,
topic_name=topic_name,
credential=self._credential,
logging_enable=self._config.logging_enable,
transport_type=self._config.transport_type,
http_proxy=self._config.http_proxy,
connection=self._connection,
**kwargs
)

return sender
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,10 @@ def from_connection_string(
"""Create a ServiceBusSender from a connection string.
:param conn_str: The connection string of a Service Bus.
:keyword str queue_name: The path of specific Service Bus Queue the client connects to. Only one of queue_name or topic_name can be provided.
:keyword str topic_name: The path of specific Service Bus Topic the client connects to. Only one of queue_name or topic_name can be provided.
:keyword str queue_name: The path of specific Service Bus Queue the client connects to.
Only one of queue_name or topic_name can be provided.
:keyword str topic_name: The path of specific Service Bus Topic the client connects to.
Only one of queue_name or topic_name can be provided.
:keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`.
:keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs.
Default value is 3.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,38 @@ def get_queue_receiver(self, queue_name, **kwargs):
)

return receiver

def get_topic_sender(self, topic_name, **kwargs):
# type: (str, Any) -> ServiceBusSender
"""Get ServiceBusSender for the specific topic.
:param str topic_name: The path of specific Service Bus Topic the client connects to.
:keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs.
Default value is 3.
:keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries.
Default value is 0.8.
:keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120.
:rtype: ~azure.servicebus.aio.ServiceBusSender
.. admonition:: Example:
.. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py
:start-after: [START create_topic_sender_from_sb_client_async]
:end-before: [END create_topic_sender_from_sb_client_async]
:language: python
:dedent: 4
:caption: Create a new instance of the ServiceBusSender from ServiceBusClient.
"""
sender = ServiceBusSender(
fully_qualified_namespace=self.fully_qualified_namespace,
topic_name=topic_name,
credential=self._credential,
logging_enable=self._config.logging_enable,
transport_type=self._config.transport_type,
http_proxy=self._config.http_proxy,
connection=self._connection,
**kwargs
)

return sender
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ class ServiceBusSender(BaseHandlerAsync, SenderMixin):
implements a particular interface for getting tokens. It accepts
:class:`ServiceBusSharedKeyCredential<azure.servicebus.ServiceBusSharedKeyCredential>`, or credential objects
generated by the azure-identity library and objects that implement the `get_token(self, *scopes)` method.
:keyword str queue_name: The path of specific Service Bus Queue the client connects to. Only one of queue_name or topic_name can be provided.
:keyword str topic_name: The path of specific Service Bus Topic the client connects to. Only one of queue_name or topic_name can be provided.
:keyword str queue_name: The path of specific Service Bus Queue the client connects to.
Only one of queue_name or topic_name can be provided.
:keyword str topic_name: The path of specific Service Bus Topic the client connects to.
Only one of queue_name or topic_name can be provided.
:keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`.
:keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs.
Default value is 3.
Expand Down
3 changes: 3 additions & 0 deletions sdk/servicebus/azure-servicebus/samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ Both [sync version](./sync_samples) and [async version](./async_samples) of samp
- [send_queue.py](./sync_samples/send_queue.py) ([async version](./async_samples/send_queue_async.py)) - Examples to send messages to a service bus queue:
- From a connection string
- Enabling Logging
- [send_topic.py](./sync_samples/send_topic.py) ([async version](./async_samples/send_topic_async.py)) - Examples to send messages to a service bus topic:
- From a connection string
- Enabling Logging
- [receive_queue.py](./sync_samples/receive_queue.py) ([async_version](./async_samples/receive_queue_async.py)) - Examples to receive messages from a service bus queue:
- Receive messages
- [receive_peek.py](./sync_samples/receive_peek.py) ([async_version](./async_samples/receive_peek_async.py)) - Examples to peek messages from a service bus queue:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,17 @@ async def example_create_servicebus_sender_async():
async with servicebus_client:
queue_sender = servicebus_client.get_queue_sender(queue_name=queue_name)
# [END create_servicebus_sender_from_sb_client_async]

# [START create_topic_sender_from_sb_client_async]
import os
from azure.servicebus import ServiceBusClient
servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR']
topic_name = os.environ['SERVICE_BUS_TOPIC_NAME']
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
async with servicebus_client:
queue_sender = servicebus_client.get_topic_sender(topic_name=topic_name)
# [END create_topic_sender_from_sb_client_async]

return queue_sender


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

"""
Example to show sending message(s) to a Service Bus Topic asynchronously.
"""

# pylint: disable=C0111

import os
import asyncio
from azure.servicebus import Message
from azure.servicebus.aio import ServiceBusClient

CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
TOPIC_NAME = os.environ["SERVICE_BUS_TOPIC_NAME"]


async def send_single_message(sender):
message = Message("DATA" * 64)
await sender.send(message)


async def send_batch_message(sender):
batch_message = await sender.create_batch()
while True:
try:
batch_message.add(Message("DATA" * 256))
except ValueError:
# BatchMessage object reaches max_size.
# New BatchMessage object can be created here to send more data.
break
await sender.send(batch_message)


async def main():
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)

async with servicebus_client:
sender = servicebus_client.get_topic_sender(topic_name=TOPIC_NAME)
async with sender:
await send_single_message(sender)
await send_batch_message(sender)

print("Send message is done.")


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ def example_create_servicebus_sender_sync():
with servicebus_client:
queue_sender = servicebus_client.get_queue_sender(queue_name=queue_name)
# [END create_servicebus_sender_from_sb_client_sync]

# [START create_topic_sender_from_sb_client_sync]
import os
from azure.servicebus import ServiceBusClient
servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR']
topic_name = os.environ['SERVICE_BUS_TOPIC_NAME']
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
with servicebus_client:
queue_sender = servicebus_client.get_topic_sender(topic_name=topic_name)
# [END create_topic_sender_from_sb_client_sync]

return queue_sender


Expand Down
45 changes: 45 additions & 0 deletions sdk/servicebus/azure-servicebus/samples/sync_samples/send_topic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

"""
Example to show sending message(s) to a Service Bus Topic.
"""

# pylint: disable=C0111

import os
from azure.servicebus import ServiceBusClient, Message

CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
TOPIC_NAME = os.environ["SERVICE_BUS_TOPIC_NAME"]


def send_single_message(sender):
message = Message("DATA" * 64)
sender.send(message)


def send_batch_message(sender):
batch_message = sender.create_batch()
while True:
try:
batch_message.add(Message("DATA" * 256))
except ValueError:
# BatchMessage object reaches max_size.
# New BatchMessage object can be created here to send more data.
break
sender.send(batch_message)


servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)
with servicebus_client:
sender = servicebus_client.get_topic_sender(topic_name=TOPIC_NAME)
with sender:
send_single_message(sender)
send_batch_message(sender)

print("Send message is done.")
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------

import asyncio
import logging
import sys
import os
import pytest
import time
from datetime import datetime, timedelta

from devtools_testutils import AzureMgmtTestCase, RandomNameResourceGroupPreparer, CachedResourceGroupPreparer

from azure.servicebus.aio import ServiceBusClient, ServiceBusSharedKeyCredential
from azure.servicebus._common.message import Message
from servicebus_preparer import (
ServiceBusNamespacePreparer,
ServiceBusTopicPreparer,
CachedServiceBusNamespacePreparer,
CachedServiceBusTopicPreparer
)
from utilities import get_logger, print_message

_logger = get_logger(logging.DEBUG)


class ServiceBusTopicsAsyncTests(AzureMgmtTestCase):
@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@CachedServiceBusTopicPreparer(name_prefix='servicebustest')
async def test_topic_by_servicebus_client_conn_str_send_basic(self, servicebus_namespace_connection_string, servicebus_topic, **kwargs):

async with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string,
logging_enable=False
) as sb_client:
async with sb_client.get_topic_sender(servicebus_topic.name) as sender:
message = Message(b"Sample topic message")
await sender.send(message)

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@CachedServiceBusTopicPreparer(name_prefix='servicebustest')
async def test_topic_by_sas_token_credential_conn_str_send_basic(self, servicebus_namespace, servicebus_namespace_key_name, servicebus_namespace_primary_key, servicebus_topic, **kwargs):
fully_qualified_namespace = servicebus_namespace.name + '.servicebus.windows.net'
async with ServiceBusClient(
fully_qualified_namespace=fully_qualified_namespace,
credential=ServiceBusSharedKeyCredential(
policy=servicebus_namespace_key_name,
key=servicebus_namespace_primary_key
),
logging_enable=False
) as sb_client:
async with sb_client.get_topic_sender(servicebus_topic.name) as sender:
message = Message(b"Sample topic message")
await sender.send(message)
5 changes: 2 additions & 3 deletions sdk/servicebus/azure-servicebus/tests/servicebus_preparer.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ def remove_resource(self, name, **kwargs):
self.client.topics.delete(group.name, namespace.name, name, polling=False)



class ServiceBusSubscriptionPreparer(_ServiceBusChildResourcePreparer):
def __init__(self,
name_prefix='',
Expand Down Expand Up @@ -243,8 +242,6 @@ def _get_topic(self, **kwargs):
raise AzureTestError(template.format(ServiceBusTopicPreparer.__name__))




class ServiceBusQueuePreparer(_ServiceBusChildResourcePreparer):
def __init__(self,
name_prefix='',
Expand Down Expand Up @@ -433,5 +430,7 @@ def _get_queue(self, **kwargs):
'decorator @{} in front of this service bus preparer.'
raise AzureTestError(template.format(ServiceBusQueuePreparer.__name__))


CachedServiceBusNamespacePreparer = functools.partial(ServiceBusNamespacePreparer, use_cache=True)
CachedServiceBusQueuePreparer = functools.partial(ServiceBusQueuePreparer, use_cache=True)
CachedServiceBusTopicPreparer = functools.partial(ServiceBusTopicPreparer, use_cache=True)
Loading

0 comments on commit ce504cb

Please sign in to comment.