Skip to content

Commit

Permalink
Fix tests and test correlation ID in stomp template
Browse files Browse the repository at this point in the history
  • Loading branch information
callumforrester committed Mar 9, 2023
1 parent ed4fe10 commit ed47a77
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 10 deletions.
25 changes: 17 additions & 8 deletions tests/core/test_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def publisher() -> EventPublisher[MyEvent]:
def test_publishes_event(publisher: EventPublisher[MyEvent]) -> None:
event = MyEvent("a")
f: Future = Future()
publisher.subscribe(f.set_result)
publisher.subscribe(lambda r, _: f.set_result(r))
publisher.publish(event)
assert f.result(timeout=_TIMEOUT) == event

Expand All @@ -32,8 +32,8 @@ def test_multi_subscriber(publisher: EventPublisher[MyEvent]) -> None:
event = MyEvent("a")
f1: Future = Future()
f2: Future = Future()
publisher.subscribe(f1.set_result)
publisher.subscribe(f2.set_result)
publisher.subscribe(lambda r, _: f1.set_result(r))
publisher.subscribe(lambda r, _: f2.set_result(r))
publisher.publish(event)
assert f1.result(timeout=_TIMEOUT) == f2.result(timeout=_TIMEOUT) == event

Expand All @@ -43,11 +43,11 @@ def test_can_unsubscribe(publisher: EventPublisher[MyEvent]) -> None:
event_b = MyEvent("b")
event_c = MyEvent("c")
q: Queue = Queue()
sub = publisher.subscribe(q.put)
sub = publisher.subscribe(lambda r, _: q.put(r))
publisher.publish(event_a)
publisher.unsubscribe(sub)
publisher.publish(event_b)
publisher.subscribe(q.put)
publisher.subscribe(lambda r, _: q.put(r))
publisher.publish(event_c)
assert list(_drain(q)) == [event_a, event_c]

Expand All @@ -57,16 +57,25 @@ def test_can_unsubscribe_all(publisher: EventPublisher[MyEvent]) -> None:
event_b = MyEvent("b")
event_c = MyEvent("c")
q: Queue = Queue()
publisher.subscribe(q.put)
publisher.subscribe(q.put)
publisher.subscribe(lambda r, _: q.put(r))
publisher.subscribe(lambda r, _: q.put(r))
publisher.publish(event_a)
publisher.unsubscribe_all()
publisher.publish(event_b)
publisher.subscribe(q.put)
publisher.subscribe(lambda r, _: q.put(r))
publisher.publish(event_c)
assert list(_drain(q)) == [event_a, event_a, event_c]


def test_correlation_id(publisher: EventPublisher[MyEvent]) -> None:
event = MyEvent("a")
correlation_id = "foobar"
f: Future = Future()
publisher.subscribe(lambda _, c: f.set_result(c))
publisher.publish(event, correlation_id)
assert f.result(timeout=_TIMEOUT) == correlation_id


def _drain(queue: Queue) -> Iterable:
while not queue.empty():
yield queue.get_nowait()
34 changes: 32 additions & 2 deletions tests/messaging/test_stomptemplate.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import itertools
from concurrent.futures import Future
from dataclasses import dataclass
from queue import Queue
from typing import Any, Iterable, Type

import pytest
Expand Down Expand Up @@ -29,6 +30,11 @@ def test_queue(template: MessagingTemplate) -> str:
return template.destinations.queue(f"test-{next(_COUNT)}")


@pytest.fixture
def test_queue_2(template: MessagingTemplate) -> str:
return template.destinations.queue(f"test-{next(_COUNT)}")


@pytest.fixture
def test_topic(template: MessagingTemplate) -> str:
return template.destinations.topic(f"test-{next(_COUNT)}")
Expand Down Expand Up @@ -141,11 +147,35 @@ def test_reconnect(template: MessagingTemplate, test_queue: str) -> None:
assert reply == "ack"


def acknowledge(template: MessagingTemplate, test_queue: str) -> None:
@pytest.mark.stomp
def test_correlation_id(
template: MessagingTemplate, test_queue: str, test_queue_2: str
) -> None:
correlation_id = "foobar"
q: Queue = Queue()

def server(ctx: MessageContext, msg: str) -> None:
q.put(ctx)
template.send(test_queue_2, msg, None, ctx.correlation_id)

def client(ctx: MessageContext, msg: str) -> None:
q.put(ctx)

template.subscribe(test_queue, server)
template.subscribe(test_queue_2, client)
template.send(test_queue, "test", None, correlation_id)

ctx_req: MessageContext = q.get(timeout=_TIMEOUT)
assert ctx_req.correlation_id == correlation_id
ctx_ack: MessageContext = q.get(timeout=_TIMEOUT)
assert ctx_ack.correlation_id == correlation_id


def acknowledge(template: MessagingTemplate, destination: str) -> None:
def server(ctx: MessageContext, message: str) -> None:
reply_queue = ctx.reply_destination
if reply_queue is None:
raise RuntimeError("reply queue is None")
template.send(reply_queue, "ack")

template.subscribe(test_queue, server)
template.subscribe(destination, server)

0 comments on commit ed47a77

Please sign in to comment.