From efa82401502c87f5f8eeed87e09fbe16a286eca6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Bompard?= Date: Thu, 11 Jul 2024 17:07:50 +0200 Subject: [PATCH] Add an embedded HTTP server to monitor the service MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See the "Monitoring" section in : https://fedora-messaging.readthedocs.io/en/develop/user-guide/consuming.html Fixes: #380 Signed-off-by: Aurélien Bompard --- config.toml.example | 4 + docs/api/api.rst | 4 +- docs/user-guide/consuming.rst | 36 +++++++++ fedora_messaging/api.py | 25 ++++-- fedora_messaging/config.py | 29 +++++++ fedora_messaging/twisted/consumer.py | 19 ++++- fedora_messaging/twisted/factory.py | 16 ++++ fedora_messaging/twisted/monitor.py | 110 ++++++++++++++++++++++++++ fedora_messaging/twisted/service.py | 8 ++ fedora_messaging/twisted/stats.py | 85 ++++++++++++++++++++ news/380.feature | 1 + tests/conftest.py | 12 +++ tests/integration/test_api.py | 33 +++++++- tests/integration/test_cli.py | 112 ++++++++++++++++++++++++++- tests/unit/test_api.py | 27 ++++++- tests/unit/test_config.py | 31 ++++++++ tests/unit/twisted/test_consumer.py | 25 ++++++ tests/unit/twisted/test_factory.py | 15 ++++ tests/unit/twisted/test_monitor.py | 70 +++++++++++++++++ tests/unit/twisted/test_service.py | 21 ++++- tests/unit/twisted/test_stats.py | 61 +++++++++++++++ tests/utils.py | 36 +++++++++ 22 files changed, 761 insertions(+), 19 deletions(-) create mode 100644 fedora_messaging/twisted/monitor.py create mode 100644 fedora_messaging/twisted/stats.py create mode 100644 news/380.feature create mode 100644 tests/unit/twisted/test_monitor.py create mode 100644 tests/unit/twisted/test_stats.py create mode 100644 tests/utils.py diff --git a/config.toml.example b/config.toml.example index e80cd1a5..a4e742f6 100644 --- a/config.toml.example +++ b/config.toml.example @@ -40,6 +40,10 @@ example_key = "for my consumer" prefetch_size = 0 prefetch_count = 25 +# [monitoring] +# Set this if you want to enable HTTP-based monitoring of the service +# port = 8070 + [log_config] version = 1 disable_existing_loggers = true diff --git a/docs/api/api.rst b/docs/api/api.rst index f6101abc..dc45fda8 100644 --- a/docs/api/api.rst +++ b/docs/api/api.rst @@ -129,8 +129,8 @@ SERIALIZED_MESSAGE_SCHEMA .. autodata:: fedora_messaging.message.SERIALIZED_MESSAGE_SCHEMA -Utilities -========= +Schema Utilities +================ .. automodule:: fedora_messaging.schema_utils diff --git a/docs/user-guide/consuming.rst b/docs/user-guide/consuming.rst index ce0c8330..ca65bc2f 100644 --- a/docs/user-guide/consuming.rst +++ b/docs/user-guide/consuming.rst @@ -197,6 +197,42 @@ consumers to use if they need configuration options. Refer to the :ref:`conf-consumer-config` in the Configuration documentation for details. +Monitoring +========== + +A Fedora Messaging consumer can start a dedicated HTTP server to let users +monitor its state. You can configure the port in the configuration file under +the ``[monitoring]`` section, with ``address`` and ``port``. +If the section is empty, the monitoring service will be disabled. +The default value for ``address`` is an empty string, which means that the +monitoring server will listen on all interfaces. +There is no default value for ``port``, you will have to choose a port. + +When a consumer is running with the monitoring server enabled, you can get the +following data:: + + $ curl http://localhost:8070/live + {"status": "OK"} + + $ curl http://localhost:8070/ready + {"consuming": true, "published": 0, "consumed": {"received": 0, "processed": 0, "dropped": 0, "rejected": 0, "failed": 0}} + +The ``/live`` endpoint always returns the same JSON data. +The statistics in the ``/ready`` endpoint are gathered from the start of the +service. They mean: + +- ``consuming``: whether the consumer is running or not. +- ``published``: amount of messages published on the bus. +- ``consumed.received``: amount of messages received from the bus. +- ``consumer.processed``: amount of messages successfully processed by the consumer +- ``consumer.dropped``: amount of messages dropped by the consumer (by raising the + :class:`fedora_messaging.exceptions.Drop` exception). +- ``consumer.rejected``: amount of messages rejected by the consumer (by raising the + :class:`fedora_messaging.exceptions.Nack` exception). Those messages were put + back in the queue. +- ``consumer.failed``: amount of message that caused another exception to be raised. + + systemd Service =============== diff --git a/fedora_messaging/api.py b/fedora_messaging/api.py index 5cd81afe..775e2ae4 100644 --- a/fedora_messaging/api.py +++ b/fedora_messaging/api.py @@ -15,7 +15,7 @@ SEVERITIES, ) from .signals import pre_publish_signal, publish_failed_signal, publish_signal -from .twisted import service +from .twisted import monitor, service from .twisted.consumer import Consumer @@ -43,13 +43,22 @@ def _init_twisted_service(): global _twisted_service - if _twisted_service is None: - _twisted_service = service.FedoraMessagingServiceV2(config.conf["amqp_url"]) - reactor.callWhenRunning(_twisted_service.startService) - # Twisted is killing the underlying connection before stopService gets - # called, so we need to add it as a pre-shutdown event to gracefully - # finish up messages in progress. - reactor.addSystemEventTrigger("before", "shutdown", _twisted_service.stopService) + if _twisted_service is not None: + return + + _twisted_service = service.FedoraMessagingServiceV2(config.conf["amqp_url"]) + if config.conf["monitoring"]: + monitor.monitor_service( + _twisted_service, + address=config.conf["monitoring"]["address"], + port=config.conf["monitoring"]["port"], + ) + + reactor.callWhenRunning(_twisted_service.startService) + # Twisted is killing the underlying connection before stopService gets + # called, so we need to add it as a pre-shutdown event to gracefully + # finish up messages in progress. + reactor.addSystemEventTrigger("before", "shutdown", _twisted_service.stopService) def _check_callback(callback): diff --git a/fedora_messaging/config.py b/fedora_messaging/config.py index 436c405a..31783529 100644 --- a/fedora_messaging/config.py +++ b/fedora_messaging/config.py @@ -287,6 +287,17 @@ def callback(message): 'prefetch_count': 10, 'prefetch_size': 0, } + +.. _conf-monitoring: + +monitoring +---------- +The options for the embedded HTTP server dedicated to monitoring the service. +This is where you can configure the address and the port to be listened on. +If the section is empty, monitoring will be disabled. +The default value for ``address`` is an empty string, which means that the +service will listen on all interfaces. There is no default value for +``port``, you will have to choose a port. """ @@ -350,6 +361,7 @@ def callback(message): bindings=[{"queue": _default_queue_name, "exchange": "amq.topic", "routing_keys": ["#"]}], qos={"prefetch_size": 0, "prefetch_count": 10}, callback=None, + monitoring={}, consumer_config={}, tls={"ca_cert": None, "certfile": None, "keyfile": None}, log_config={ @@ -460,6 +472,22 @@ def validate_client_properties(props): ) +def validate_monitoring(monitoring_conf): + """ + Validate the monitoring setting. + + This will add the "address" and "port" keys if they are missing. + """ + if not monitoring_conf: + return # If empty, monitoring will be disabled. + if "port" not in monitoring_conf: + raise exceptions.ConfigurationException( + "The port must be defined in [monitoring] to activate it" + ) + if "address" not in monitoring_conf: + monitoring_conf["address"] = "" + + class LazyConfig(dict): """This class lazy-loads the configuration file.""" @@ -510,6 +538,7 @@ def _validate(self): validate_queues(self["queues"]) validate_bindings(self["bindings"]) validate_client_properties(self["client_properties"]) + validate_monitoring(self["monitoring"]) def load_config(self, config_path=None): """ diff --git a/fedora_messaging/twisted/consumer.py b/fedora_messaging/twisted/consumer.py index cae1a906..bb9392c0 100644 --- a/fedora_messaging/twisted/consumer.py +++ b/fedora_messaging/twisted/consumer.py @@ -35,6 +35,7 @@ ValidationError, ) from ..message import get_message +from .stats import ConsumerStatistics _std_log = logging.getLogger(__name__) @@ -110,17 +111,24 @@ def __init__(self, queue=None, callback=None): # The unique ID for the AMQP consumer. self._tag = str(uuid.uuid4()) # Used in the consumer read loop to know when it's being canceled. - self._running = True + self._running = False # The current read loop self._read_loop = None # The protocol that currently runs this consumer, used when cancel is # called to remove itself from the protocol and its factory so it doesn't # restart on the next connection. self._protocol = None + # Message statistics + self.stats = ConsumerStatistics() def __repr__(self): return f"Consumer(queue={self.queue}, callback={self.callback})" + @property + def running(self) -> bool: + """Whether the consumer is running.""" + return self._running + @defer.inlineCallbacks def consume(self): yield self._channel.basic_qos( @@ -144,6 +152,7 @@ def consume(self): except AttributeError: pass # pika 1.0.0+ + self._running = True self._read_loop = self._read(queue_object) self._read_loop.addErrback(self._read_loop_errback) @@ -207,6 +216,8 @@ def _read_one(self, queue_object): yield channel.basic_nack(delivery_tag=delivery_frame.delivery_tag, requeue=False) return + self.stats.received += 1 + try: _std_log.info( "Consuming message from topic %s (message id %s)", @@ -221,19 +232,24 @@ def _read_one(self, queue_object): except Nack: _std_log.warning("Returning message id %s to the queue", properties.message_id) yield channel.basic_nack(delivery_tag=delivery_frame.delivery_tag, requeue=True) + self.stats.rejected += 1 except Drop: _std_log.warning("Consumer requested message id %s be dropped", properties.message_id) yield channel.basic_nack(delivery_tag=delivery_frame.delivery_tag, requeue=False) + self.stats.dropped += 1 except HaltConsumer as e: _std_log.info("Consumer indicated it wishes consumption to halt, shutting down") if e.requeue: yield channel.basic_nack(delivery_tag=delivery_frame.delivery_tag, requeue=True) + self.stats.rejected += 1 else: yield channel.basic_ack(delivery_tag=delivery_frame.delivery_tag) + self.stats.processed += 1 raise e except Exception as e: _std_log.exception("Received unexpected exception from consumer %r", self) yield channel.basic_nack(delivery_tag=0, multiple=True, requeue=True) + self.stats.failed += 1 raise e else: _std_log.info( @@ -242,6 +258,7 @@ def _read_one(self, queue_object): properties.message_id, ) yield channel.basic_ack(delivery_tag=delivery_frame.delivery_tag) + self.stats.processed += 1 def _on_cancel_callback(self, frame): """ diff --git a/fedora_messaging/twisted/factory.py b/fedora_messaging/twisted/factory.py index 28ba13da..0cde40f8 100644 --- a/fedora_messaging/twisted/factory.py +++ b/fedora_messaging/twisted/factory.py @@ -37,6 +37,7 @@ from ..exceptions import ConnectionException from .protocol import FedoraMessagingProtocolV2 +from .stats import ConsumerStatistics, FactoryStatistics _std_log = logging.getLogger(__name__) @@ -86,6 +87,7 @@ def __init__(self, parameters, confirms=True): self._client_deferred = defer.Deferred() self._client = None self._consumers = [] + self._stats = FactoryStatistics() def __repr__(self): """Return the representation of the factory as a string""" @@ -232,6 +234,7 @@ def publish(self, message, exchange): protocol = yield self.when_connected() try: yield protocol.publish(message, exchange) + self._stats.published += 1 break except ConnectionException: _std_log.info("Publish failed on %r, waiting for new connection", protocol) @@ -315,3 +318,16 @@ def _forget_consumer(self, queue): queue (str): Forget the consumers that consume from this queue. """ self._consumers = [record for record in self._consumers if record.consumer.queue != queue] + + @property + def stats(self) -> ConsumerStatistics: + """Statistics about this factory's consumer(s).""" + self._stats.consumed = sum( + (record.consumer.stats for record in self._consumers), start=ConsumerStatistics() + ) + return self._stats + + @property + def consuming(self) -> bool: + """Whether the consumer(s) is currently running.""" + return any(record.consumer.running for record in self._consumers) diff --git a/fedora_messaging/twisted/monitor.py b/fedora_messaging/twisted/monitor.py new file mode 100644 index 00000000..aa12c5f8 --- /dev/null +++ b/fedora_messaging/twisted/monitor.py @@ -0,0 +1,110 @@ +# This file is part of fedora_messaging. +# Copyright (C) 2018 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +A Twisted HTTP service to monitor a Fedora Messaging Service. + +This module provides a HTTP service that can be used to implement health checks in OpenShift, as +described here: https://docs.openshift.com/container-platform/4.16/applications/application-health.html + +The webserver will listen on the port set in the configuration file, and provides two endpoints that +return JSON data: +- `/live` to check when the program is up +- `/ready` to check when the consumer is connected, and get the statistics +""" + +import abc +import json +import typing + +from twisted.application.internet import TCPServer +from twisted.web import resource, server + + +if typing.TYPE_CHECKING: + from .service import FedoraMessagingServiceV2 + + +class FMServiceResource(resource.Resource, metaclass=abc.ABCMeta): + """An abstract class for service-monitoring endpoints.""" + + def __init__(self, *args, **kwargs): + self._fm_service = kwargs.pop("fm_service") + super().__init__(*args, **kwargs) + + @abc.abstractmethod + def _get_response(self) -> dict: + """Return the response as a dictionary.""" + raise NotImplementedError + + def render_GET(self, request): + request.setHeader("Content-Type", "application/json ") + return json.dumps(self._get_response()).encode("utf-8") + b"\n" + + +class Live(FMServiceResource): + """The `/live` endpoint, returns JSON""" + + isLeaf = True + + def _get_response(self): + return {"status": "OK"} + + +class Ready(FMServiceResource): + """The `/ready` endpoint + + Returns the consumer state and some statistics about messages consumed and produced in + JSON format. + """ + + isLeaf = True + + def _get_response(self): + response = {"consuming": self._fm_service.consuming} + response.update(self._fm_service.stats.as_dict()) + return response + + +class MonitoringSite(server.Site): + """A subclass of Twisted's site to redefine its name in the logs.""" + + def logPrefix(self): + return "Monitoring HTTP server" + + +def monitor_service( + fm_service: "FedoraMessagingServiceV2", *, address: str, port: int +) -> TCPServer: + """Add the Twisted service for HTTP-based monitoring to the provided Fedora Messaging Service. + + Args: + fm_service: the service to monitor + address: the IP address to listen on + port: the TCP port to listen on + + Returns: + The monitoring service + """ + root = resource.Resource() + root.putChild(b"live", Live(fm_service=fm_service)) + root.putChild(b"ready", Ready(fm_service=fm_service)) + site = MonitoringSite(root) + monitor_service = TCPServer(port, site, interface=address) + monitor_service.setName("monitoring") + monitor_service.setServiceParent(fm_service) + return monitor_service diff --git a/fedora_messaging/twisted/service.py b/fedora_messaging/twisted/service.py index 67abe349..ebd2e87d 100644 --- a/fedora_messaging/twisted/service.py +++ b/fedora_messaging/twisted/service.py @@ -98,6 +98,14 @@ def stopService(self): yield self._service.factory.stopFactory() yield service.MultiService.stopService(self) + @property + def stats(self): + return self._service.factory.stats + + @property + def consuming(self): + return self._service.factory.consuming + def _configure_tls_parameters(parameters): """ diff --git a/fedora_messaging/twisted/stats.py b/fedora_messaging/twisted/stats.py new file mode 100644 index 00000000..94e35971 --- /dev/null +++ b/fedora_messaging/twisted/stats.py @@ -0,0 +1,85 @@ +# This file is part of fedora_messaging. +# Copyright (C) 2018 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Datastructures to store consumer and producer statistics. +""" + + +from typing import Any + + +class Statistics: + """A datastructure to manager integers as attributes.""" + + names = [] + + def __init__(self): + for name in self.names: + setattr(self, name, 0) + + def __setattr__(self, name: str, value: Any) -> None: + if name not in self.names: + raise AttributeError( + f"{self.__class__.__name__} does not have a {name} attribute. " + f"Available attributes: {', '.join(sorted(self.names))}." + ) + return super().__setattr__(name, value) + + def __add__(self, other): + if not isinstance(other, self.__class__): + raise TypeError( + f"{self.__class__.__name__} instances can only be added to other " + f"{self.__class__.__name__} instances." + ) + new_stats = self.__class__() + for name in self.names: + setattr(new_stats, name, getattr(self, name) + getattr(other, name)) + return new_stats + + def as_dict(self): + return {name: getattr(self, name) for name in self.names} + + def __repr__(self): + return f"<{self.__class__.__name__} {self.as_dict()}>" + + +class ConsumerStatistics(Statistics): + """Statistics for a :class:`Consumer`.""" + + names = ( + "received", + "processed", + "dropped", + "rejected", + "failed", + ) + + +class FactoryStatistics(Statistics): + """Statistics for a :class:`FedoraMessagingFactoryV2`.""" + + names = ("published", "consumed") + + def __init__(self): + super().__init__() + self.consumed = ConsumerStatistics() + + def as_dict(self): + d = super().as_dict() + d["consumed"] = self.consumed.as_dict() + return d diff --git a/news/380.feature b/news/380.feature new file mode 100644 index 00000000..8bdb2575 --- /dev/null +++ b/news/380.feature @@ -0,0 +1 @@ +Add an embedded HTTP server to monitor the service, see the "Monitoring" section in diff --git a/tests/conftest.py b/tests/conftest.py index 7d06b23d..3b39f72a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -20,6 +20,8 @@ import crochet import pytest +from .utils import get_available_port + @pytest.fixture(autouse=True, scope="session") def crochet_no_setup(): @@ -29,3 +31,13 @@ def crochet_no_setup(): @pytest.fixture def fixtures_dir(): return os.path.abspath(os.path.join(os.path.dirname(__file__), "fixtures/")) + + +@pytest.fixture +def available_port(): + try: + import pytest_twisted + except ImportError: + pytest.skip("pytest-twisted is missing, skipping tests", allow_module_level=True) + + return pytest_twisted.blockon(get_available_port()) diff --git a/tests/integration/test_api.py b/tests/integration/test_api.py index d5c652a5..18cb6b42 100644 --- a/tests/integration/test_api.py +++ b/tests/integration/test_api.py @@ -155,9 +155,15 @@ def callback(message): assert expected_headers == m._headers server_queue = yield get_queue(queues) assert server_queue["consumers"] == 0 + # Verify stats + assert consumers[0].stats.received == 3 + assert consumers[0].stats.processed == 3 + assert consumers[0].stats.rejected == 0 except (defer.TimeoutError, defer.CancelledError): yield consumers[0].cancel() pytest.fail("Timeout reached without consumer halting!") + else: + yield consumers[0].cancel() @pytest_twisted.inlineCallbacks @@ -240,7 +246,6 @@ def test_twisted_consume_halt_consumer_requeue(queue_and_binding): ) def callback(message): - """Count to 3 and quit.""" raise exceptions.HaltConsumer(exit_code=1, requeue=True) # Assert that the number of consumers we think we started is the number the @@ -259,9 +264,15 @@ def callback(message): server_queue = yield get_queue(queues) assert server_queue["consumers"] == 0 assert server_queue["messages_ready"] == 1 + # Verify stats + assert consumers[0].stats.received == 1 + assert consumers[0].stats.processed == 0 + assert consumers[0].stats.rejected == 1 except (defer.TimeoutError, defer.CancelledError): yield consumers[0].cancel() pytest.fail("Timeout reached without consumer halting!") + else: + yield consumers[0].cancel() @pytest_twisted.inlineCallbacks @@ -298,9 +309,14 @@ def callback(message): server_queue = yield get_queue(queues) assert server_queue["consumers"] == 0 assert server_queue["messages"] == 0 + # Verify stats + assert consumers[0].stats.received == 2 + assert consumers[0].stats.processed == 1 # The message raising HaltConsumer + assert consumers[0].stats.dropped == 1 except (defer.TimeoutError, defer.CancelledError): + yield consumers[0].cancel() pytest.fail("Timeout reached without consumer halting!") - finally: + else: yield consumers[0].cancel() @@ -336,9 +352,14 @@ def callback(message): server_queue = yield get_queue(queues) assert server_queue["consumers"] == 0 assert server_queue["messages"] == 0 + # Verify stats + assert consumers[0].stats.received == 2 + assert consumers[0].stats.processed == 1 # The message raising HaltConsumer + assert consumers[0].stats.rejected == 1 except (defer.TimeoutError, defer.CancelledError): + yield consumers[0].cancel() pytest.fail("Timeout reached without consumer halting!") - finally: + else: yield consumers[0].cancel() @@ -370,6 +391,7 @@ def callback(message): yield consumers[0].result pytest.fail("Expected an exception to be raised.") except (defer.TimeoutError, defer.CancelledError): + yield consumers[0].cancel() pytest.fail("Timeout reached without consumer halting!") except Exception as e: # Assert the message was delivered and re-queued when the consumer crashed. @@ -377,6 +399,9 @@ def callback(message): server_queue = yield get_queue(queues) assert server_queue["consumers"] == 0 assert server_queue["messages"] == 1 + # Verify stats + assert consumers[0].stats.received == 1 + assert consumers[0].stats.failed == 1 finally: yield consumers[0].cancel() @@ -448,6 +473,8 @@ def callback(message): except (defer.TimeoutError, defer.CancelledError): yield consumers[0].cancel() pytest.fail("Timeout reached without consumer halting!") + else: + yield consumers[0].cancel() @pytest_twisted.inlineCallbacks diff --git a/tests/integration/test_cli.py b/tests/integration/test_cli.py index dfb4643c..0f7acf4e 100644 --- a/tests/integration/test_cli.py +++ b/tests/integration/test_cli.py @@ -24,7 +24,8 @@ import pytest import pytest_twisted import requests -from twisted.internet import threads +from twisted.internet import error, reactor, threads +from twisted.web.client import Agent, readBody from fedora_messaging import api, exceptions, message @@ -32,8 +33,14 @@ @pytest.fixture -def cli_conf(fixtures_dir): - return os.path.join(fixtures_dir, "cli_integration.toml") +def cli_conf(fixtures_dir, tmp_path, available_port): + config_path = tmp_path.joinpath("config.toml") + with open(config_path, "w") as config_fh: + with open(os.path.join(fixtures_dir, "cli_integration.toml")) as ref_fh: + config_fh.write(ref_fh.read()) + config_fh.write("\n") + config_fh.write(f"[monitoring]\nport = {available_port}\n") + return config_path def halt_exit_0(message): @@ -51,6 +58,17 @@ def fail_processing(message): raise ValueError() +def execute_action(message): + if message.body.get("action") == "halt": + raise exceptions.HaltConsumer() + elif message.body.get("action") == "drop": + raise exceptions.Drop() + elif message.body.get("action") == "reject": + raise exceptions.Nack() + elif message.body.get("action") == "fail": + raise ValueError() + + @pytest.fixture def queue(scope="function"): queue = str(uuid.uuid4()) @@ -73,6 +91,7 @@ def queue(scope="function"): @pytest_twisted.inlineCallbacks def test_consume_halt_with_exitcode(callback, exit_code, msg, queue, cli_conf): """Assert user execution halt with reason and exit_code is reported.""" + cmd = shutil.which("fedora-messaging") args = [ sys.executable, @@ -99,3 +118,90 @@ def test_consume_halt_with_exitcode(callback, exit_code, msg, queue, cli_conf): assert process.returncode == exit_code, process.stderr.read() assert msg in process.stdout.read() + + +@pytest_twisted.inlineCallbacks +def test_consume_monitoring(queue, cli_conf, available_port): + """Assert the monitoring works.""" + cmd = shutil.which("fedora-messaging") + args = [ + sys.executable, + cmd, + f"--conf={cli_conf}", + "consume", + "--callback=tests.integration.test_cli:execute_action", + f"--queue-name={queue}", + "--exchange=amq.topic", + "--routing-key=#", + ] + + http_client = Agent(reactor) + base_url = f"http://localhost:{available_port}".encode("ascii") + + # Monitoring not available yet + with pytest.raises(error.ConnectionRefusedError): + yield http_client.request(b"GET", base_url + b"/live") + + # Start the consumer + process = subprocess.Popen(args) # noqa: S603 + + # Wait for the consumer to start up + for _ in range(5): + yield sleep(1) + try: + response = yield http_client.request(b"GET", base_url + b"/live") + except error.ConnectionRefusedError: + continue + # Check the monitoring on startup + body = yield readBody(response) + assert body == b'{"status": "OK"}\n' + response = yield http_client.request(b"GET", base_url + b"/ready") + body = yield readBody(response) + assert body == ( + b'{"consuming": true, "published": 0, "consumed": {"received": 0, "processed": 0, "dropped": 0, ' + b'"rejected": 0, "failed": 0}}\n' + ) + break + else: + pytest.fail(f"Monitoring didn't start: {process.stdout.read()} -- {process.stderr.read()}") + + # Publish a message + yield threads.deferToThread(api.publish, message.Message()) + yield sleep(0.5) + + # Check stats + response = yield http_client.request(b"GET", base_url + b"/ready") + body = yield readBody(response) + assert body == ( + b'{"consuming": true, "published": 0, "consumed": {"received": 1, "processed": 1, "dropped": 0, ' + b'"rejected": 0, "failed": 0}}\n' + ) + + # Publish a message and drop it + yield threads.deferToThread(api.publish, message.Message(body={"action": "drop"})) + yield sleep(0.5) + + # Check stats + response = yield http_client.request(b"GET", base_url + b"/ready") + body = yield readBody(response) + assert body == ( + b'{"consuming": true, "published": 0, "consumed": {"received": 2, "processed": 1, "dropped": 1, ' + b'"rejected": 0, "failed": 0}}\n' + ) + + # Don't check the reject action or the message will be put back in the queue and loop forever + # Don't check a generic processing failure because it stops the consumer instantly and we can't look + # at the stats + + # Now stop + yield threads.deferToThread(api.publish, message.Message(body={"action": "halt"})) + + for _ in range(5): + yield sleep(1) + if process.poll() is not None: + break + else: + process.kill() + pytest.fail(f"Process never stopped!: {process.stdout.read()}") + + assert process.returncode == 0, process.stderr.read() diff --git a/tests/unit/test_api.py b/tests/unit/test_api.py index 61b40826..bdda8f63 100644 --- a/tests/unit/test_api.py +++ b/tests/unit/test_api.py @@ -30,7 +30,7 @@ publish_failed_signal, publish_signal, ) -from fedora_messaging.twisted import consumer +from fedora_messaging.twisted import consumer, monitor class TestCheckCallback: @@ -392,3 +392,28 @@ def test_consume_successful_halt(): yield d except (defer.TimeoutError, defer.CancelledError): pytest.fail("Expected the consume call to immediately finish, not time out") + + +@pytest.fixture +def clear_twisted_service(): + api._twisted_service = None + yield + api._twisted_service = None + + +def test_monitoring_enabled(clear_twisted_service, available_port): + with mock.patch.dict(config.conf["monitoring"], {"port": available_port, "address": ""}): + api._init_twisted_service() + try: + monitoring_service = api._twisted_service.getServiceNamed("monitoring") + except KeyError: + pytest.fail("Monitoring service wasn't started.") + assert monitoring_service.args[0] == available_port + assert isinstance(monitoring_service.args[1], monitor.MonitoringSite) + + +def test_monitoring_disabled(clear_twisted_service): + api._init_twisted_service() + with pytest.raises(KeyError): + api._twisted_service.getServiceNamed("monitoring") + assert len(api._twisted_service.services) == 1 # only the consumer diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 64fe6453..c0fa7b4e 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -90,6 +90,9 @@ class = "logging.StreamHandler" empty_config = b'# publish_exchange = "special_exchange"' partial_config = b'publish_exchange = "special_exchange"' malformed_config = b'publish_exchange = "special_exchange' # missing close quote +empty_monitoring_config = b"[monitoring]\n" +monitoring_config_with_port = b"[monitoring]\nport = 42\n" +monitoring_config_without_port = b"[monitoring]\naddress = ''\n" class TestObj: @@ -256,6 +259,33 @@ def test_partial_config_file(self, mock_exists, mock_log): ) assert 0 == mock_log.warning.call_count + @mock.patch("fedora_messaging.config.open", mock.mock_open(read_data=empty_monitoring_config)) + @mock.patch("fedora_messaging.config.os.path.exists", return_value=True) + def test_empty_monitoring_section(self, mock_exists): + """Assert the monitoring port is mandatory""" + config = msg_config.LazyConfig().load_config() + assert config["monitoring"] == {} + + @mock.patch( + "fedora_messaging.config.open", mock.mock_open(read_data=monitoring_config_without_port) + ) + @mock.patch("fedora_messaging.config.os.path.exists", return_value=True) + def test_monitoring_section_without_port(self, mock_exists): + """Assert the monitoring port is mandatory""" + with pytest.raises(ConfigurationException) as cm: + msg_config.LazyConfig().load_config() + assert cm.value.message == "The port must be defined in [monitoring] to activate it" + + @mock.patch( + "fedora_messaging.config.open", mock.mock_open(read_data=monitoring_config_with_port) + ) + @mock.patch("fedora_messaging.config.os.path.exists", return_value=True) + def test_monitoring_section_with_port(self, mock_exists): + """Assert the monitoring address default is set if absent""" + config = msg_config.LazyConfig().load_config() + assert config["monitoring"]["port"] == 42 + assert config["monitoring"]["address"] == "" + @mock.patch("fedora_messaging.config.open", mock.mock_open(read_data=full_config)) @mock.patch("fedora_messaging.config._log", autospec=True) @mock.patch("fedora_messaging.config.os.path.exists", return_value=True) @@ -291,6 +321,7 @@ def test_full_config_file(self, mock_exists, mock_log): }, bindings=[{"queue": "my_queue", "exchange": "amq.topic", "routing_keys": ["#"]}], qos={"prefetch_size": 25, "prefetch_count": 25}, + monitoring={}, callback="fedora_messaging.examples:print_msg", consumer_config={"example_key": "for my consumer"}, tls={ diff --git a/tests/unit/twisted/test_consumer.py b/tests/unit/twisted/test_consumer.py index 2f56a297..5441b740 100644 --- a/tests/unit/twisted/test_consumer.py +++ b/tests/unit/twisted/test_consumer.py @@ -160,6 +160,9 @@ def test_read(self, mocker): ], ) + self.consumer._running = True + assert self.consumer.running is True + yield self.consumer._read(queue) assert get_message.call_args_list == [ @@ -178,11 +181,14 @@ def test_read(self, mocker): (tuple(), dict(delivery_tag="dt2")), (tuple(), dict(delivery_tag="dt3")), ] + assert self.consumer.stats.received == 3 + assert self.consumer.stats.processed == 3 @pytest_twisted.inlineCallbacks def test_read_not_running(self): # When not running, _read() should do nothing. self.consumer._running = False + assert self.consumer.running is False queue = Mock() queue.get.side_effect = lambda: defer.succeed(None) yield self.consumer._read(queue) @@ -196,6 +202,7 @@ def test_message_invalid(self): self.consumer._channel.basic_nack.assert_called_with( delivery_tag="delivery_tag", requeue=False ) + assert self.consumer.stats.received == 0 @pytest.mark.parametrize("error_class", [HaltConsumer, ValueError]) @pytest_twisted.inlineCallbacks @@ -225,12 +232,19 @@ def test_read_exception(self, mocker, error_class): pytest.fail(f"This should have raised {error_class}") self.consumer.cancel.assert_called_once_with() + assert self.consumer.stats.received == 1 + assert self.consumer.stats.rejected == 0 + assert self.consumer.stats.dropped == 0 if error_class == HaltConsumer: self.consumer._channel.basic_ack.assert_called_once_with(delivery_tag="dt1") + assert self.consumer.stats.processed == 1 + assert self.consumer.stats.failed == 0 else: self.consumer._channel.basic_nack.assert_called_once_with( delivery_tag=0, multiple=True, requeue=True ) + assert self.consumer.stats.failed == 1 + assert self.consumer.stats.processed == 0 # Handling read errors @@ -388,6 +402,8 @@ def test_read(self, mock_class): yield _call_read_one(consumer, "testing.topic", {}, {"key": "value"}) callback.assert_called_once() consumer._channel.basic_ack.assert_called_once_with(delivery_tag="delivery_tag") + assert consumer.stats.received == 1 + assert consumer.stats.processed == 1 @pytest_twisted.inlineCallbacks def test_nack(self, mock_class): @@ -397,6 +413,8 @@ def test_nack(self, mock_class): yield _call_read_one(consumer, "testing.topic", {}, {"key": "value"}) callback.assert_called() consumer._channel.basic_nack.assert_called_with(delivery_tag="delivery_tag", requeue=True) + assert consumer.stats.received == 1 + assert consumer.stats.rejected == 1 @pytest_twisted.inlineCallbacks def test_drop(self, mock_class): @@ -406,6 +424,8 @@ def test_drop(self, mock_class): yield _call_read_one(consumer, "testing.topic", {}, {"key": "value"}) callback.assert_called() consumer._channel.basic_nack.assert_called_with(delivery_tag="delivery_tag", requeue=False) + assert consumer.stats.received == 1 + assert consumer.stats.dropped == 1 @pytest.mark.parametrize("requeue", [False, True]) @pytest_twisted.inlineCallbacks @@ -422,12 +442,15 @@ def test_halt(self, mock_class, requeue): callback.assert_called() channel = consumer._channel + assert consumer.stats.received == 1 if requeue: channel.basic_ack.assert_not_called() channel.basic_nack.assert_called_with(delivery_tag="delivery_tag", requeue=True) + assert consumer.stats.rejected == 1 else: channel.basic_ack.assert_called_with(delivery_tag="delivery_tag") channel.basic_nack.assert_not_called() + assert consumer.stats.processed == 1 @pytest_twisted.inlineCallbacks def test_exception(self, mock_class): @@ -446,6 +469,8 @@ def test_exception(self, mock_class): callback.assert_called() channel = consumer._channel channel.basic_nack.assert_called_with(delivery_tag=0, multiple=True, requeue=True) + assert consumer.stats.received == 1 + assert consumer.stats.failed == 1 class TestAddTimeout: diff --git a/tests/unit/twisted/test_factory.py b/tests/unit/twisted/test_factory.py index 9399c65a..b019654b 100644 --- a/tests/unit/twisted/test_factory.py +++ b/tests/unit/twisted/test_factory.py @@ -169,6 +169,7 @@ def _publish(_): def _check(publish_result): self.protocol.publish.assert_called_once_with(message, exchange) + assert self.factory.stats.published == 1 d.addCallback(_publish) d.addCallback(_check) @@ -212,6 +213,20 @@ def _check(_): self.protocol.bind_queues.assert_called_once_with(expected_bindings) self.protocol.consume.assert_called_once_with(callback, declared_queue) + assert self.factory.consuming is False + consumer._running = True + assert self.factory.consuming is True + assert self.factory.stats.as_dict() == { + "published": 0, + "consumed": { + "received": 0, + "processed": 0, + "dropped": 0, + "rejected": 0, + "failed": 0, + }, + } + d.addCallback(_consume) d.addCallback(_check) return d diff --git a/tests/unit/twisted/test_monitor.py b/tests/unit/twisted/test_monitor.py new file mode 100644 index 00000000..c4f9c1c3 --- /dev/null +++ b/tests/unit/twisted/test_monitor.py @@ -0,0 +1,70 @@ +# This file is part of fedora_messaging. +# Copyright (C) 2018 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import pytest +from twisted.application.service import MultiService +from twisted.web.client import Agent, readBody + +from fedora_messaging.twisted.monitor import monitor_service +from fedora_messaging.twisted.stats import ConsumerStatistics + + +try: + import pytest_twisted +except ImportError: + pytest.skip("pytest-twisted is missing, skipping tests", allow_module_level=True) + + +@pytest.fixture +def service(available_port): + srv = MultiService() + monitor_service(srv, address="127.0.0.1", port=available_port) + srv.startService() + yield srv + srv.stopService() + + +@pytest.fixture +def client(): + from twisted.internet import reactor + + return Agent(reactor) + + +class TestMonitorService: + + @pytest_twisted.inlineCallbacks + def test_liveness(self, available_port, service, client): + response = yield client.request( + b"GET", f"http://localhost:{available_port}/live".encode("ascii") + ) + body = yield readBody(response) + assert body == b'{"status": "OK"}\n' + + @pytest_twisted.inlineCallbacks + def test_readiness(self, available_port, service, client): + service.consuming = True + service.stats = ConsumerStatistics() + service.stats.received = 42 + response = yield client.request( + b"GET", f"http://localhost:{available_port}/ready".encode("ascii") + ) + body = yield readBody(response) + assert body == ( + b'{"consuming": true, "received": 42, "processed": 0, "dropped": 0, "rejected": 0, ' + b'"failed": 0}\n' + ) diff --git a/tests/unit/twisted/test_service.py b/tests/unit/twisted/test_service.py index 508899d8..ddb93e8a 100644 --- a/tests/unit/twisted/test_service.py +++ b/tests/unit/twisted/test_service.py @@ -26,7 +26,8 @@ from twisted.internet import ssl as twisted_ssl from fedora_messaging import config, exceptions -from fedora_messaging.twisted.factory import FedoraMessagingFactoryV2 +from fedora_messaging.twisted.consumer import Consumer +from fedora_messaging.twisted.factory import ConsumerRecord, FedoraMessagingFactoryV2 from fedora_messaging.twisted.service import ( _configure_tls_parameters, _ssl_context_factory, @@ -69,6 +70,24 @@ def test_stopService(self): service._service.factory.stopTrying.assert_called_once() service._service.factory.stopFactory.assert_called_once() + def test_stats(self): + service = FedoraMessagingServiceV2("amqp://") + assert service.stats.as_dict() == { + "published": 0, + "consumed": {"received": 0, "processed": 0, "dropped": 0, "rejected": 0, "failed": 0}, + } + assert service.consuming is False + consumer = Consumer() + consumer._running = True + consumer.stats.received = 42 + consumer.stats.processed = 43 + service._service.factory._consumers.append(ConsumerRecord(consumer, None, None)) + assert service.stats.as_dict() == { + "published": 0, + "consumed": {"received": 42, "processed": 43, "dropped": 0, "rejected": 0, "failed": 0}, + } + assert service.consuming is True + class ConfigureTlsParameters: """Tests for :func:`fedora_messaging._session._configure_tls_parameters`""" diff --git a/tests/unit/twisted/test_stats.py b/tests/unit/twisted/test_stats.py new file mode 100644 index 00000000..06abfc55 --- /dev/null +++ b/tests/unit/twisted/test_stats.py @@ -0,0 +1,61 @@ +# This file is part of fedora_messaging. +# Copyright (C) 2018 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import pytest + +from fedora_messaging.twisted.stats import ConsumerStatistics + + +def test_repr(): + expected = "" + assert repr(ConsumerStatistics()) == expected + assert str(ConsumerStatistics()) == expected + + +def test_stats_add(): + stats_1 = ConsumerStatistics() + stats_1.received = 42 + stats_1.processed = 43 + stats_2 = ConsumerStatistics() + stats_2.received = 1 + stats_2.processed = 2 + stats_2.dropped = 10 + combined = stats_1 + stats_2 + assert combined.as_dict() == { + "received": 43, + "processed": 45, + "dropped": 10, + "rejected": 0, + "failed": 0, + } + + +def test_stats_add_bad_type(): + with pytest.raises(TypeError) as handler: + ConsumerStatistics() + 42 + assert str(handler.value) == ( + "ConsumerStatistics instances can only be added to other ConsumerStatistics instances." + ) + + +def test_stats_bad_attr(): + with pytest.raises(AttributeError) as handler: + ConsumerStatistics().dummy = 42 + assert str(handler.value) == ( + "ConsumerStatistics does not have a dummy attribute. Available attributes: dropped, " + "failed, processed, received, rejected." + ) diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 00000000..158567b4 --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,36 @@ +# This file is part of fedora_messaging. +# Copyright (C) 2018 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +from random import randrange + +from twisted.internet import defer, error, protocol + + +@defer.inlineCallbacks +def get_available_port(): + from twisted.internet import reactor + + dummy_server = protocol.ServerFactory() + while True: + port = randrange(1025, 65534) # noqa: S311 + try: + twisted_port = reactor.listenTCP(port, dummy_server, interface="127.0.0.1") + except error.CannotListenError: + continue + else: + yield twisted_port.stopListening() + defer.returnValue(port)