Skip to content

Commit

Permalink
Add some publish integration tests
Browse files Browse the repository at this point in the history
Signed-off-by: Jeremy Cline <jcline@redhat.com>
  • Loading branch information
jeremycline authored and mergify[bot] committed Jun 27, 2019
1 parent ab94a7a commit 1c6c22d
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 1 deletion.
46 changes: 46 additions & 0 deletions fedora_messaging/tests/integration/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,52 @@ def test_twisted_consume_update_callback(queue_and_binding):
yield consumers2[0].cancel()


@pytest_twisted.inlineCallbacks
def test_publish_channel_error(queue_and_binding):
"""Assert publishing recovers from a channel error."""
queues, bindings = queue_and_binding

def counting_callback(message, storage=defaultdict(int)):
storage[message.topic] += 1
if storage[message.topic] == 2:
raise exceptions.HaltConsumer()

@pytest_twisted.inlineCallbacks
def delayed_publish():
"""Publish, break the channel, and publish again."""
yield threads.deferToThread(api.publish, message.Message(), "amq.topic")
protocol = yield api._twisted_service._service.factory.when_connected()
yield protocol._publish_channel.close()
yield threads.deferToThread(api.publish, message.Message(), "amq.topic")

reactor.callLater(5, delayed_publish)

deferred_consume = threads.deferToThread(
api.consume, counting_callback, bindings, queues
)
deferred_consume.addTimeout(30, reactor)
try:
yield deferred_consume
pytest.fail("consume should have raised an exception")
except (defer.TimeoutError, defer.CancelledError):
pytest.fail("Consumer did not receive both messages")
except exceptions.HaltConsumer as e:
assert 0 == e.exit_code


@pytest_twisted.inlineCallbacks
def test_protocol_publish_connection_error(queue_and_binding):
"""Assert individual protocols raise connection exceptions if closed."""
api._init_twisted_service()
protocol = yield api._twisted_service._service.factory.when_connected()
yield protocol.close()
try:
yield protocol.publish(message.Message(), "amq.topic")
pytest.fail("Expected a ConnectionException")
except exceptions.ConnectionException:
pass


@pytest_twisted.inlineCallbacks
def test_pub_sub_default_settings(queue_and_binding):
"""
Expand Down
5 changes: 4 additions & 1 deletion fedora_messaging/twisted/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,14 @@ def _allocate_channel(self):
Raises:
NoFreeChannels: If this connection has reached its maximum number of channels.
ConncetionException: If this connection is already closed.
"""
try:
channel = yield self.channel()
except pika.exceptions.NoFreeChannels:
raise NoFreeChannels()
except pika.exceptions.ConnectionWrongStateError as e:
raise ConnectionException(reason=e)
_std_log.debug("Created AMQP channel id %d", channel.channel_number)
if self._confirms:
yield channel.confirm_delivery()
Expand Down Expand Up @@ -297,7 +300,7 @@ def publish(self, message, exchange):
except (pika.exceptions.NackError, pika.exceptions.UnroutableError) as e:
_std_log.error("Message was rejected by the broker (%s)", str(e))
raise PublishReturned(reason=e)
except pika.exceptions.ChannelClosed:
except (pika.exceptions.ChannelClosed, pika.exceptions.ChannelWrongStateError):
self._publish_channel = None
yield self.publish(message, exchange)
except (
Expand Down

0 comments on commit 1c6c22d

Please sign in to comment.