Skip to content

Commit

Permalink
Add an embedded HTTP server to monitor the service
Browse files Browse the repository at this point in the history
See the "Monitoring" section in :
https://fedora-messaging.readthedocs.io/en/develop/user-guide/consuming.html

Fixes: #380

Signed-off-by: Aurélien Bompard <aurelien@bompard.org>
  • Loading branch information
abompard committed Jul 23, 2024
1 parent d29f1f2 commit 0895c7f
Show file tree
Hide file tree
Showing 22 changed files with 761 additions and 19 deletions.
4 changes: 4 additions & 0 deletions config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ example_key = "for my consumer"
prefetch_size = 0
prefetch_count = 25

# [monitoring]
# Set this if you want to enable HTTP-based monitoring of the service
# port = 8070

[log_config]
version = 1
disable_existing_loggers = true
Expand Down
4 changes: 2 additions & 2 deletions docs/api/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ SERIALIZED_MESSAGE_SCHEMA
.. autodata:: fedora_messaging.message.SERIALIZED_MESSAGE_SCHEMA


Utilities
=========
Schema Utilities
================

.. automodule:: fedora_messaging.schema_utils

Expand Down
36 changes: 36 additions & 0 deletions docs/user-guide/consuming.rst
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,42 @@ consumers to use if they need configuration options. Refer to the
:ref:`conf-consumer-config` in the Configuration documentation for details.


Monitoring
==========

A Fedora Messaging consumer can start a dedicated HTTP server to let users
monitor its state. You can configure the port in the configuration file under
the ``[monitoring]`` section, with ``address`` and ``port``.
If the section is empty, the monitoring service will be disabled.
The default value for ``address`` is an empty string, which means that the
monitoring server will listen on all interfaces.
There is no default value for ``port``, you will have to choose a port.

When a consumer is running with the monitoring server enabled, you can get the
following data::

$ curl http://localhost:8070/live
{"status": "OK"}

$ curl http://localhost:8070/ready
{"consuming": true, "published": 0, "consumed": {"received": 0, "processed": 0, "dropped": 0, "rejected": 0, "failed": 0}}

The ``/live`` endpoint always returns the same JSON data.
The statistics in the ``/ready`` endpoint are gathered from the start of the
service. They mean:

- ``consuming``: whether the consumer is running or not.
- ``published``: amount of messages published on the bus.
- ``consumed.received``: amount of messages received from the bus.
- ``consumer.processed``: amount of messages successfully processed by the consumer
- ``consumer.dropped``: amount of messages dropped by the consumer (by raising the
:class:`fedora_messaging.exceptions.Drop` exception).
- ``consumer.rejected``: amount of messages rejected by the consumer (by raising the
:class:`fedora_messaging.exceptions.Nack` exception). Those messages were put
back in the queue.
- ``consumer.failed``: amount of message that caused another exception to be raised.


systemd Service
===============

Expand Down
25 changes: 17 additions & 8 deletions fedora_messaging/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
SEVERITIES,
)
from .signals import pre_publish_signal, publish_failed_signal, publish_signal
from .twisted import service
from .twisted import monitor, service
from .twisted.consumer import Consumer


Expand Down Expand Up @@ -43,13 +43,22 @@

def _init_twisted_service():
global _twisted_service
if _twisted_service is None:
_twisted_service = service.FedoraMessagingServiceV2(config.conf["amqp_url"])
reactor.callWhenRunning(_twisted_service.startService)
# Twisted is killing the underlying connection before stopService gets
# called, so we need to add it as a pre-shutdown event to gracefully
# finish up messages in progress.
reactor.addSystemEventTrigger("before", "shutdown", _twisted_service.stopService)
if _twisted_service is not None:
return

_twisted_service = service.FedoraMessagingServiceV2(config.conf["amqp_url"])
if config.conf["monitoring"]:
monitor.monitor_service(
_twisted_service,
address=config.conf["monitoring"]["address"],
port=config.conf["monitoring"]["port"],
)

reactor.callWhenRunning(_twisted_service.startService)
# Twisted is killing the underlying connection before stopService gets
# called, so we need to add it as a pre-shutdown event to gracefully
# finish up messages in progress.
reactor.addSystemEventTrigger("before", "shutdown", _twisted_service.stopService)


def _check_callback(callback):
Expand Down
29 changes: 29 additions & 0 deletions fedora_messaging/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,17 @@ def callback(message):
'prefetch_count': 10,
'prefetch_size': 0,
}
.. _conf-monitoring:
monitoring
----------
The options for the embedded HTTP server dedicated to monitoring the service.
This is where you can configure the address and the port to be listened on.
If the section is empty, monitoring will be disabled.
The default value for ``address`` is an empty string, which means that the
service will listen on all interfaces. There is no default value for
``port``, you will have to choose a port.
"""


Expand Down Expand Up @@ -350,6 +361,7 @@ def callback(message):
bindings=[{"queue": _default_queue_name, "exchange": "amq.topic", "routing_keys": ["#"]}],
qos={"prefetch_size": 0, "prefetch_count": 10},
callback=None,
monitoring={},
consumer_config={},
tls={"ca_cert": None, "certfile": None, "keyfile": None},
log_config={
Expand Down Expand Up @@ -460,6 +472,22 @@ def validate_client_properties(props):
)


def validate_monitoring(monitoring_conf):
"""
Validate the monitoring setting.
This will add the "address" and "port" keys if they are missing.
"""
if not monitoring_conf:
return # If empty, monitoring will be disabled.
if "port" not in monitoring_conf:
raise exceptions.ConfigurationException(
"The port must be defined in [monitoring] to activate it"
)
if "address" not in monitoring_conf:
monitoring_conf["address"] = ""


class LazyConfig(dict):
"""This class lazy-loads the configuration file."""

Expand Down Expand Up @@ -510,6 +538,7 @@ def _validate(self):
validate_queues(self["queues"])
validate_bindings(self["bindings"])
validate_client_properties(self["client_properties"])
validate_monitoring(self["monitoring"])

def load_config(self, config_path=None):
"""
Expand Down
19 changes: 18 additions & 1 deletion fedora_messaging/twisted/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
ValidationError,
)
from ..message import get_message
from .stats import ConsumerStatistics


_std_log = logging.getLogger(__name__)
Expand Down Expand Up @@ -110,17 +111,24 @@ def __init__(self, queue=None, callback=None):
# The unique ID for the AMQP consumer.
self._tag = str(uuid.uuid4())
# Used in the consumer read loop to know when it's being canceled.
self._running = True
self._running = False
# The current read loop
self._read_loop = None
# The protocol that currently runs this consumer, used when cancel is
# called to remove itself from the protocol and its factory so it doesn't
# restart on the next connection.
self._protocol = None
# Message statistics
self.stats = ConsumerStatistics()

def __repr__(self):
return f"Consumer(queue={self.queue}, callback={self.callback})"

@property
def running(self) -> bool:
"""Whether the consumer is running."""
return self._running

@defer.inlineCallbacks
def consume(self):
yield self._channel.basic_qos(
Expand All @@ -144,6 +152,7 @@ def consume(self):
except AttributeError:
pass # pika 1.0.0+

self._running = True
self._read_loop = self._read(queue_object)
self._read_loop.addErrback(self._read_loop_errback)

Expand Down Expand Up @@ -207,6 +216,8 @@ def _read_one(self, queue_object):
yield channel.basic_nack(delivery_tag=delivery_frame.delivery_tag, requeue=False)
return

self.stats.received += 1

try:
_std_log.info(
"Consuming message from topic %s (message id %s)",
Expand All @@ -221,19 +232,24 @@ def _read_one(self, queue_object):
except Nack:
_std_log.warning("Returning message id %s to the queue", properties.message_id)
yield channel.basic_nack(delivery_tag=delivery_frame.delivery_tag, requeue=True)
self.stats.rejected += 1
except Drop:
_std_log.warning("Consumer requested message id %s be dropped", properties.message_id)
yield channel.basic_nack(delivery_tag=delivery_frame.delivery_tag, requeue=False)
self.stats.dropped += 1
except HaltConsumer as e:
_std_log.info("Consumer indicated it wishes consumption to halt, shutting down")
if e.requeue:
yield channel.basic_nack(delivery_tag=delivery_frame.delivery_tag, requeue=True)
self.stats.rejected += 1
else:
yield channel.basic_ack(delivery_tag=delivery_frame.delivery_tag)
self.stats.processed += 1
raise e
except Exception as e:
_std_log.exception("Received unexpected exception from consumer %r", self)
yield channel.basic_nack(delivery_tag=0, multiple=True, requeue=True)
self.stats.failed += 1
raise e
else:
_std_log.info(
Expand All @@ -242,6 +258,7 @@ def _read_one(self, queue_object):
properties.message_id,
)
yield channel.basic_ack(delivery_tag=delivery_frame.delivery_tag)
self.stats.processed += 1

def _on_cancel_callback(self, frame):
"""
Expand Down
16 changes: 16 additions & 0 deletions fedora_messaging/twisted/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

from ..exceptions import ConnectionException
from .protocol import FedoraMessagingProtocolV2
from .stats import ConsumerStatistics, FactoryStatistics


_std_log = logging.getLogger(__name__)
Expand Down Expand Up @@ -86,6 +87,7 @@ def __init__(self, parameters, confirms=True):
self._client_deferred = defer.Deferred()
self._client = None
self._consumers = []
self._stats = FactoryStatistics()

def __repr__(self):
"""Return the representation of the factory as a string"""
Expand Down Expand Up @@ -232,6 +234,7 @@ def publish(self, message, exchange):
protocol = yield self.when_connected()
try:
yield protocol.publish(message, exchange)
self._stats.published += 1
break
except ConnectionException:
_std_log.info("Publish failed on %r, waiting for new connection", protocol)
Expand Down Expand Up @@ -315,3 +318,16 @@ def _forget_consumer(self, queue):
queue (str): Forget the consumers that consume from this queue.
"""
self._consumers = [record for record in self._consumers if record.consumer.queue != queue]

@property
def stats(self) -> ConsumerStatistics:
"""Statistics about this factory's consumer(s)."""
self._stats.consumed = sum(
(record.consumer.stats for record in self._consumers), start=ConsumerStatistics()
)
return self._stats

@property
def consuming(self) -> bool:
"""Whether the consumer(s) is currently running."""
return any(record.consumer.running for record in self._consumers)
110 changes: 110 additions & 0 deletions fedora_messaging/twisted/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# This file is part of fedora_messaging.
# Copyright (C) 2018 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

"""
A Twisted HTTP service to monitor a Fedora Messaging Service.
This module provides a HTTP service that can be used to implement health checks in OpenShift, as
described here: https://docs.openshift.com/container-platform/4.16/applications/application-health.html
The webserver will listen on the port set in the configuration file, and provides two endpoints that
return JSON data:
- `/live` to check when the program is up
- `/ready` to check when the consumer is connected, and get the statistics
"""

import abc
import json
import typing

from twisted.application.internet import TCPServer
from twisted.web import resource, server


if typing.TYPE_CHECKING:
from .service import FedoraMessagingServiceV2


class FMServiceResource(resource.Resource, metaclass=abc.ABCMeta):
"""An abstract class for service-monitoring endpoints."""

def __init__(self, *args, **kwargs):
self._fm_service = kwargs.pop("fm_service")
super().__init__(*args, **kwargs)

@abc.abstractmethod
def _get_response(self) -> dict:
"""Return the response as a dictionary."""
raise NotImplementedError

def render_GET(self, request):
request.setHeader("Content-Type", "application/json ")
return json.dumps(self._get_response()).encode("utf-8") + b"\n"


class Live(FMServiceResource):
"""The `/live` endpoint, returns JSON"""

isLeaf = True

def _get_response(self):
return {"status": "OK"}


class Ready(FMServiceResource):
"""The `/ready` endpoint
Returns the consumer state and some statistics about messages consumed and produced in
JSON format.
"""

isLeaf = True

def _get_response(self):
response = {"consuming": self._fm_service.consuming}
response.update(self._fm_service.stats.as_dict())
return response


class MonitoringSite(server.Site):
"""A subclass of Twisted's site to redefine its name in the logs."""

def logPrefix(self):
return "Monitoring HTTP server"


def monitor_service(
fm_service: "FedoraMessagingServiceV2", *, address: str, port: int
) -> TCPServer:
"""Add the Twisted service for HTTP-based monitoring to the provided Fedora Messaging Service.
Args:
fm_service: the service to monitor
address: the IP address to listen on
port: the TCP port to listen on
Returns:
The monitoring service
"""
root = resource.Resource()
root.putChild(b"live", Live(fm_service=fm_service))
root.putChild(b"ready", Ready(fm_service=fm_service))
site = MonitoringSite(root)
monitor_service = TCPServer(port, site, interface=address)
monitor_service.setName("monitoring")
monitor_service.setServiceParent(fm_service)
return monitor_service
Loading

0 comments on commit 0895c7f

Please sign in to comment.