Skip to content

Commit

Permalink
Document code and README (#11)
Browse files Browse the repository at this point in the history
* Complete model docstrings

* Complete docstrings for messaging module

* Add example usage to README
  • Loading branch information
callumforrester authored Aug 6, 2024
1 parent 7a52dc8 commit 7304849
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 30 deletions.
48 changes: 33 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,44 @@

STOMP integration for bluesky

This is where you should write a short paragraph that describes what your module does,
how it does it, and why people should use it.

Source | <https://github.com/DiamondLightSource/bluesky-stomp>
:---: | :---:
PyPI | `pip install bluesky-stomp`
Releases | <https://github.com/DiamondLightSource/bluesky-stomp/releases>

This is where you should put some images or code snippets that illustrate
some relevant examples. If it is a library then you might put some
introductory code here:

```python
from bluesky_stomp import __version__
## Low Level API

print(f"Hello bluesky_stomp {__version__}")
```
The library comes with some helpers for interacting with a stomp broker:

Or if it is a commandline tool then you might put some example commands here:

```
python -m bluesky_stomp --version
```python
from bluesky_stomp.messaging import MessageContext, MessagingTemplate
from bluesky_stomp.models import Broker, MessageQueue, MessageTopic

# Assumes you have an unauthenticated broker such as ActiveMQ running on localhost:61613
template = MessagingTemplate.for_broker(Broker(host="localhost", port=61613))

try:
# Connect to the broker
template.connect()

# Send a message to a queue and a topic
template.send(MessageQueue(name="my-queue"), {"foo": 1, "bar": 2})
template.send(MessageTopic(name="my-topic"), {"foo": 1, "bar": 2})

# Subscribe to messages on a topic, print all messages received,
# assumes there is another service to post messages to the topic
def on_message(message: str, context: MessageContext) -> None:
print(message)

template.subscribe(MessageTopic(name="my-other-topic"), on_message)

# Send a message and wait for a reply, assumes there is another service
# post the reply
reply_future = template.send_and_receive(
MessageQueue(name="my-queue"), {"foo": 1, "bar": 2}
)
print(reply_future.result(timeout=5.0))
finally:
# Disconnect at the end
template.disconnect()
```
81 changes: 68 additions & 13 deletions src/bluesky_stomp/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class StompReconnectPolicy:


@dataclass
class Subscription:
class _Subscription:
"""
Details of a subscription, the template needs its own representation to
defer subscriptions until after connection
Expand All @@ -75,8 +75,8 @@ class Subscription:

class MessagingTemplate:
"""
MessagingTemplate that uses the stomp protocol, meant for use
with ActiveMQ.
Utility class with helper methods for communication with a STOMP
broker such as ActiveMQ or RabbitMQ.
"""

def __init__(
Expand All @@ -87,6 +87,20 @@ def __init__(
serializer: MessageSerializer = serialize_message,
deserializer: MessageDeserializer[Any] = deserialize_message,
) -> None:
"""
Constructor
Args:
conn: STOMP connection object
reconnect_policy: Policy for how to handle unexpected disconnection.
Defaults to the default settings of StompReconnectPolicy.
authentication: Authentication credentials, if required. Defaults to None.
serializer: Function to serialize messages to send to the broker.
Defaults to serialize_message.
deserializer: Function to deserialize messages from the broker.
Defaults to deserialize_message.
"""

self._conn = conn
self._reconnect_policy = reconnect_policy or StompReconnectPolicy()
self._authentication = authentication or None
Expand All @@ -99,10 +113,20 @@ def __init__(
self._listener.on_message = self._on_message
self._conn.set_listener("", self._listener) # type: ignore

self._subscriptions: dict[str, Subscription] = {}
self._subscriptions: dict[str, _Subscription] = {}

@classmethod
def for_broker(cls, broker: Broker) -> "MessagingTemplate":
"""
Alternative constructor that sets up a connection from host details
Args:
broker: Details of the message broker
Returns:
MessagingTemplate: A new template
"""

return cls(
Connection(
[(broker.host, broker.port)],
Expand All @@ -122,15 +146,14 @@ def send_and_receive(
Send a message expecting a single reply.
Args:
destination (str): Destination to send the message
obj (Any): Message to send, must be serializable
reply_type (Type, optional): Expected type of reply, used
in deserialization. Defaults to str.
correlation_id (Optional[str]): An id which correlates this request with
requests it spawns or the request which
spawned it etc.
destination: Destination to send the message
obj: Message to send, must be serializable
reply_type: Expected type of reply, used in deserialization.
Defaults to str.
correlation_id: An id which correlates this request with requests it
spawns or the request which spawned it etc.
Returns:
Future: Future representing the reply
Future[T]: Future representing the reply
"""

future: Future[T] = Future()
Expand All @@ -156,6 +179,20 @@ def send(
on_reply_error: ErrorListener | None = None,
correlation_id: str | None = None,
) -> None:
"""
Send a message to a given destination, handle a reply if required
Args:
destination: Where to send the message
obj: Message content
on_reply: Callback for a reply, if supplied, the broker will be asked
to create a temporary queue for replies. Defaults to None.
on_reply_error: How to handle any errors in processing replies.
Defaults to None.
correlation_id: Correlation ID to be optionally included in message
headers. Defaults to None.
"""

raw_destination = _destination(destination)
serialized_message = self._serializer(obj)
self._send_bytes(
Expand Down Expand Up @@ -221,6 +258,16 @@ def subscribe(
callback: MessageListener,
on_error: ErrorListener | None = None,
) -> None:
"""
Subscibe to messages on a particular queue or topic
Args:
destination: Where the messages will come from
callback: Function for processing the messages.
The message type is derived from the function signature.
on_error: How to handle errors processing the message. Defaults to None.
"""

logging.debug(f"New subscription to {destination}")
obj_type = determine_callback_deserialization_type(callback)

Expand All @@ -246,14 +293,18 @@ def wrapper(frame: Frame) -> None:
if isinstance(destination, TemporaryMessageQueue)
else str(next(self._sub_num))
)
self._subscriptions[sub_id] = Subscription(
self._subscriptions[sub_id] = _Subscription(
destination, wrapper, on_error=on_error
)
# If we're connected, subscribe immediately, otherwise the subscription is
# deferred until connection.
self._ensure_subscribed([sub_id])

def connect(self) -> None:
"""
Connect to the broker
"""

if not self._conn.is_connected():
connected: Event = Event()

Expand Down Expand Up @@ -296,6 +347,10 @@ def _ensure_subscribed(self, sub_ids: list[str] | None = None) -> None:
)

def disconnect(self) -> None:
"""
Disconnect from the broker
"""

logging.info("Disconnecting...")
if not self.is_connected():
logging.info("Already disconnected")
Expand Down
8 changes: 6 additions & 2 deletions src/bluesky_stomp/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@


class BasicAuthentication(BaseModel):
"""
User credentials for basic authentication
"""

username: str = Field(description="Unique identifier for user")
password: str = Field(description="Password to verify user's identity")

Expand All @@ -23,10 +27,10 @@ def localhost(cls) -> "Broker":


class DestinationBase(BaseModel):
model_config = ConfigDict(frozen=True)

"""Base class for possible destinations of stomp messages"""

model_config = ConfigDict(frozen=True)


class MessageQueue(DestinationBase):
"""
Expand Down

0 comments on commit 7304849

Please sign in to comment.