From 51dd72b9aae7007c2752147c1bdbc422935b1bd4 Mon Sep 17 00:00:00 2001 From: Remi Hakim Date: Thu, 3 Oct 2013 15:45:43 +0200 Subject: [PATCH] Improvements to the rabbitmq integration Fix #682 , Fix #644 --- checks.d/rabbitmq.py | 270 ++++++++++++++++++++++++------------------- 1 file changed, 149 insertions(+), 121 deletions(-) diff --git a/checks.d/rabbitmq.py b/checks.d/rabbitmq.py index 8d8b479d3b..3cd43a4190 100644 --- a/checks.d/rabbitmq.py +++ b/checks.d/rabbitmq.py @@ -1,10 +1,17 @@ import urllib2 import urlparse +import time from checks import AgentCheck from util import json -QUEUE_ATTRIBUTES = [ +EVENT_TYPE = SOURCE_TYPE_NAME = 'rabbitmq' +QUEUE_TYPE = 'queues' +NODE_TYPE = 'nodes' +MAX_DETAILED_QUEUES = 200 +MAX_DETAILED_NODES = 100 +ALERT_THRESHOLD = 0.9 # Post an event in the stream when the number of queues or nodes to collect is above 90% of the limit +QUEUE_ATTRIBUTES = [ 'active_consumers', 'consumers', 'memory', @@ -14,31 +21,49 @@ ] NODE_ATTRIBUTES = [ - 'disk_free', - 'disk_free_limit', 'fd_total', 'fd_used', 'mem_limit', 'mem_used', - 'proc_total', - 'proc_used', - 'processors', 'run_queue', 'sockets_total', 'sockets_used', ] -MAX_QUEUES = 5 -MAX_NODES = 3 +ATTRIBUTES = { + QUEUE_TYPE: QUEUE_ATTRIBUTES, + NODE_TYPE: NODE_ATTRIBUTES, +} -QUEUE_LIMIT = 100 -NODE_LIMIT = 100 + + +TAGS_MAP = { + QUEUE_TYPE: { + 'node':'node', + 'name':'queue', + 'vhost':'vhost', + 'policy':'policy', + }, + NODE_TYPE: { + 'name':'node', + } +} + +METRIC_SUFFIX = { + QUEUE_TYPE: "queue", + NODE_TYPE: "node", +} class RabbitMQ(AgentCheck): """This check is for gathering statistics from the RabbitMQ Management Plugin (http://www.rabbitmq.com/management.html) """ - def check(self, instance): + + def __init__(self, name, init_config, agentConfig, instances=None): + AgentCheck.__init__(self, name, init_config, agentConfig, instances) + self.already_alerted = [] + + def _get_config(self, instance): # make sure 'rabbitmq_api_url; is present if 'rabbitmq_api_url' not in instance: raise Exception('Missing "rabbitmq_api_url" in RabbitMQ config.') @@ -50,14 +75,32 @@ def check(self, instance): username = instance.get('rabbitmq_user', 'guest') password = instance.get('rabbitmq_pass', 'guest') + # Limit of queues/nodes to collect metrics from + max_detailed = { + QUEUE_TYPE: int(instance.get('max_detailed_queues', MAX_DETAILED_QUEUES)), + NODE_TYPE: int(instance.get('max_detailed_nodes', MAX_DETAILED_NODES)), + } + + # List of queues/nodes to collect metrics from + specified = { + QUEUE_TYPE: instance.get('queues', []), + NODE_TYPE: instance.get('nodes', []), + } + # setup urllib2 for Basic Auth auth_handler = urllib2.HTTPBasicAuthHandler() auth_handler.add_password(realm='RabbitMQ Management', uri=base_url, user=username, passwd=password) opener = urllib2.build_opener(auth_handler) urllib2.install_opener(opener) - self.get_queue_stats(instance, base_url) - self.get_node_stats(instance, base_url) + return base_url, max_detailed, specified + + + def check(self, instance): + base_url, max_detailed, specified = self._get_config(instance) + self.get_stats(instance, base_url, QUEUE_TYPE, max_detailed[QUEUE_TYPE], specified[QUEUE_TYPE]) + self.get_stats(instance, base_url, NODE_TYPE, max_detailed[NODE_TYPE], specified[NODE_TYPE]) + def _get_data(self, url): try: @@ -69,118 +112,103 @@ def _get_data(self, url): return data - def _get_metrics_for_queue(self, queue, is_gauge=False, send_histogram=True): - if is_gauge: + def get_stats(self, instance, base_url, object_type, max_detailed, specified): + data = self._get_data(urlparse.urljoin(base_url, object_type)) + + if len(data) > ALERT_THRESHOLD * max_detailed and not specified: + self.alert(base_url, max_detailed, len(data), object_type) + + if len(data) > max_detailed and not specified: + self.warning("Too many queues to fetch. You must choose the queues you are interested in by editing the rabbitmq.yaml configuration file or get in touch with Datadog Support") + + if len(specified) > max_detailed: + raise Exception("The maximum number of %s you can specify is %d." % (object_type, max_detailed)) + + limit_reached = False + detailed = 0 + for data_line in data: + name = data_line.get("name") + absolute_name = name + + if object_type == QUEUE_TYPE: + absolute_name = '%s/%s' % (data_line.get("vhost"), name) + + if len(data) < max_detailed: + # The number of queues or nodes is below the limit. + # We can collect detailed metrics for those + self._get_metrics(data_line, object_type, detailed=True) + detailed += 1 + + elif name in specified: + # This queue/node is specified in the config + # We can collect detailed metrics for those + self._get_metrics(data_line, object_type, detailed=True) + detailed += 1 + specified.remove(name) + + elif absolute_name in specified: + # This queue/node is specified in the config + # We can collect detailed metrics for those + self._get_metrics(data_line, object_type, detailed=True) + detailed += 1 + specified.remove(absolute_name) + + elif not limit_reached and not specified: + # No queues/nodes are specified in the config but we haven't reached the limit yet + # We can collect detailed metrics for those + self._get_metrics(data_line, object_type, detailed=True) + detailed += 1 + + limit_reached = detailed >= max_detailed + + if limit_reached or len(data) > max_detailed and not specified: + self._get_metrics(data_line, object_type, detailed=False) + + def _get_metrics(self, data, object_type, detailed): + if detailed: tags = [] - tag_list = { - 'node':'node', - 'name':'queue', - 'vhost':'vhost', - 'policy':'policy', - } + tag_list = TAGS_MAP[object_type] for t in tag_list.keys(): - tag = queue.get(t, None) + tag = data.get(t, None) if tag is not None: tags.append('rabbitmq_%s:%s' % (tag_list[t], tag)) - else: - tags = None - - for attribute in QUEUE_ATTRIBUTES: - value = queue.get(attribute, None) + for attribute in ATTRIBUTES[object_type]: + value = data.get(attribute, None) if value is not None: - if send_histogram: - self.histogram('rabbitmq.queue.%s.hist' % attribute, int(value)) - if is_gauge: - self.gauge('rabbitmq.queue.%s' % attribute, int(value), tags=tags) - - - def _get_metrics_for_node(self, node, is_gauge=False, send_histogram=True): - if is_gauge: - tags = [] - if 'name' in node: - tags.append('rabbitmq_node:%s' % node['name']) + self.histogram('rabbitmq.%s.%s.hist' % (METRIC_SUFFIX[object_type], attribute), int(value)) + if detailed: + self.gauge('rabbitmq.%s.%s' % (METRIC_SUFFIX[object_type], attribute), int(value), tags=tags) + + def alert(self, base_url, max_detailed, size, object_type): + key = "%s%s" % (base_url, object_type) + if key in self.already_alerted: + # We already posted an event + return + + self.already_alerted.append(key) + + title = "RabbitMQ integration is approaching the limit on %s" % self.hostname + msg = """%s %s are present. The limit is %s. + Please get in touch with Datadog support to increase the limit.""" % (size, object_type, max_detailed) + + event = { + "timestamp": int(time.time()), + "event_type": EVENT_TYPE, + "api_key": self.agentConfig['api_key'], + "msg_title": title, + "msg_text": msg, + "alert_type": 'warning', + "source_type_name": SOURCE_TYPE_NAME, + "host": self.hostname, + "tags": ["base_url:%s" % base_url, "host:%s" % self.hostname], + "event_object": key, + } + + self.event(event) + + + + + - for attribute in NODE_ATTRIBUTES: - value = node.get(attribute, None) - if value is not None: - if send_histogram: - self.histogram('rabbitmq.node.%s.hist' % attribute, int(value)) - if is_gauge: - self.gauge('rabbitmq.node.%s' % attribute, int(value), tags=tags) - - - def get_queue_stats(self, instance, base_url): - url = urlparse.urljoin(base_url, 'queues') - queues = self._get_data(url) - - if len(queues) > 100 and not instance.get('queues', None): - self.warning("Too many queues to fetch. You must choose the queues you are interested in by editing the rabbitmq.yaml configuration file") - - allowed_queues = instance.get('queues', []) - if len(allowed_queues) > MAX_QUEUES: - raise Exception("The maximum number of queues you can specify is %d." % MAX_QUEUES) - - if not allowed_queues: - allowed_queues = [q.get('name') for q in queues[:MAX_QUEUES]] - # If no queues are specified in the config, we only get metrics for the 5 first ones. - # Others will be aggregated - - i = 0 - queue_Limit_reached = False - for queue in queues: - name = queue.get('name') - if name in allowed_queues: - self._get_metrics_for_queue(queue, is_gauge=True, send_histogram=len(queues) > MAX_QUEUES) - allowed_queues.remove(name) - elif queue_Limit_reached: - if not allowed_queues: - # We have reached the limit and we have already processed the config specified queues - break - # We have reached the limit but some queues specified in the config still haven't been processed - continue - else: - self._get_metrics_for_queue(queue) - - i += 1 - if i > QUEUE_LIMIT: - self.warning("More than %s queues are present. Only collecting data using the 100 first" % QUEUE_LIMIT) - queue_Limit_reached = True - - - def get_node_stats(self, instance, base_url): - url = urlparse.urljoin(base_url, 'nodes') - nodes = self._get_data(url) - - if len(nodes) > 100 and not instance.get('nodes', None): - self.warning("Too many queues to fetch. You must choose the queues you are interested in by editing the rabbitmq.yaml configuration file") - - allowed_nodes = instance.get('nodes', []) - if len(allowed_nodes) > MAX_NODES: - raise Exception("The maximum number of nodes you can specify is %d." % MAX_NODES) - - if not allowed_nodes: - allowed_nodes = [n.get('name') for n in nodes[:MAX_NODES]] - # If no nodes are specified in the config, we only get metrics for the 5 first ones. - # Others will be aggregated - - i = 0 - node_limit_reached = False - for node in nodes: - name = node.get('name') - if name in allowed_nodes: - self._get_metrics_for_node(node, is_gauge=True, send_histogram=len(nodes) > MAX_NODES) - allowed_nodes.remove(name) - elif node_limit_reached: - if not allowed_nodes: - # We have reached the limit and we have already processed the config specified nodes - break - # We have reached the limit but some nodes specified in the config still haven't been processed - continue - else: - self._get_metrics_for_node(node) - - i += 1 - if i > NODE_LIMIT: - self.warning("More than %s nodes are present. Only collecting data using the 100 first" % NODE_LIMIT) - node_limit_reached = True