Skip to content

Commit

Permalink
Improvements to the rabbitmq integration
Browse files Browse the repository at this point in the history
Fix #682 , Fix #644
  • Loading branch information
remh committed Oct 3, 2013
1 parent 2c510a0 commit 51dd72b
Showing 1 changed file with 149 additions and 121 deletions.
270 changes: 149 additions & 121 deletions checks.d/rabbitmq.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import urllib2
import urlparse
import time

from checks import AgentCheck
from util import json

QUEUE_ATTRIBUTES = [
EVENT_TYPE = SOURCE_TYPE_NAME = 'rabbitmq'
QUEUE_TYPE = 'queues'
NODE_TYPE = 'nodes'
MAX_DETAILED_QUEUES = 200
MAX_DETAILED_NODES = 100
ALERT_THRESHOLD = 0.9 # Post an event in the stream when the number of queues or nodes to collect is above 90% of the limit
QUEUE_ATTRIBUTES = [
'active_consumers',
'consumers',
'memory',
Expand All @@ -14,31 +21,49 @@
]

NODE_ATTRIBUTES = [
'disk_free',
'disk_free_limit',
'fd_total',
'fd_used',
'mem_limit',
'mem_used',
'proc_total',
'proc_used',
'processors',
'run_queue',
'sockets_total',
'sockets_used',
]

MAX_QUEUES = 5
MAX_NODES = 3
ATTRIBUTES = {
QUEUE_TYPE: QUEUE_ATTRIBUTES,
NODE_TYPE: NODE_ATTRIBUTES,
}

QUEUE_LIMIT = 100
NODE_LIMIT = 100


TAGS_MAP = {
QUEUE_TYPE: {
'node':'node',
'name':'queue',
'vhost':'vhost',
'policy':'policy',
},
NODE_TYPE: {
'name':'node',
}
}

METRIC_SUFFIX = {
QUEUE_TYPE: "queue",
NODE_TYPE: "node",
}

class RabbitMQ(AgentCheck):
"""This check is for gathering statistics from the RabbitMQ
Management Plugin (http://www.rabbitmq.com/management.html)
"""
def check(self, instance):

def __init__(self, name, init_config, agentConfig, instances=None):
AgentCheck.__init__(self, name, init_config, agentConfig, instances)
self.already_alerted = []

def _get_config(self, instance):
# make sure 'rabbitmq_api_url; is present
if 'rabbitmq_api_url' not in instance:
raise Exception('Missing "rabbitmq_api_url" in RabbitMQ config.')
Expand All @@ -50,14 +75,32 @@ def check(self, instance):
username = instance.get('rabbitmq_user', 'guest')
password = instance.get('rabbitmq_pass', 'guest')

# Limit of queues/nodes to collect metrics from
max_detailed = {
QUEUE_TYPE: int(instance.get('max_detailed_queues', MAX_DETAILED_QUEUES)),
NODE_TYPE: int(instance.get('max_detailed_nodes', MAX_DETAILED_NODES)),
}

# List of queues/nodes to collect metrics from
specified = {
QUEUE_TYPE: instance.get('queues', []),
NODE_TYPE: instance.get('nodes', []),
}

# setup urllib2 for Basic Auth
auth_handler = urllib2.HTTPBasicAuthHandler()
auth_handler.add_password(realm='RabbitMQ Management', uri=base_url, user=username, passwd=password)
opener = urllib2.build_opener(auth_handler)
urllib2.install_opener(opener)

self.get_queue_stats(instance, base_url)
self.get_node_stats(instance, base_url)
return base_url, max_detailed, specified


def check(self, instance):
base_url, max_detailed, specified = self._get_config(instance)
self.get_stats(instance, base_url, QUEUE_TYPE, max_detailed[QUEUE_TYPE], specified[QUEUE_TYPE])
self.get_stats(instance, base_url, NODE_TYPE, max_detailed[NODE_TYPE], specified[NODE_TYPE])


def _get_data(self, url):
try:
Expand All @@ -69,118 +112,103 @@ def _get_data(self, url):
return data


def _get_metrics_for_queue(self, queue, is_gauge=False, send_histogram=True):
if is_gauge:
def get_stats(self, instance, base_url, object_type, max_detailed, specified):
data = self._get_data(urlparse.urljoin(base_url, object_type))

if len(data) > ALERT_THRESHOLD * max_detailed and not specified:
self.alert(base_url, max_detailed, len(data), object_type)

if len(data) > max_detailed and not specified:
self.warning("Too many queues to fetch. You must choose the queues you are interested in by editing the rabbitmq.yaml configuration file or get in touch with Datadog Support")

if len(specified) > max_detailed:
raise Exception("The maximum number of %s you can specify is %d." % (object_type, max_detailed))

limit_reached = False
detailed = 0
for data_line in data:
name = data_line.get("name")
absolute_name = name

if object_type == QUEUE_TYPE:
absolute_name = '%s/%s' % (data_line.get("vhost"), name)

if len(data) < max_detailed:
# The number of queues or nodes is below the limit.
# We can collect detailed metrics for those
self._get_metrics(data_line, object_type, detailed=True)
detailed += 1

elif name in specified:
# This queue/node is specified in the config
# We can collect detailed metrics for those
self._get_metrics(data_line, object_type, detailed=True)
detailed += 1
specified.remove(name)

elif absolute_name in specified:
# This queue/node is specified in the config
# We can collect detailed metrics for those
self._get_metrics(data_line, object_type, detailed=True)
detailed += 1
specified.remove(absolute_name)

elif not limit_reached and not specified:
# No queues/nodes are specified in the config but we haven't reached the limit yet
# We can collect detailed metrics for those
self._get_metrics(data_line, object_type, detailed=True)
detailed += 1

limit_reached = detailed >= max_detailed

if limit_reached or len(data) > max_detailed and not specified:
self._get_metrics(data_line, object_type, detailed=False)

def _get_metrics(self, data, object_type, detailed):
if detailed:
tags = []
tag_list = {
'node':'node',
'name':'queue',
'vhost':'vhost',
'policy':'policy',
}
tag_list = TAGS_MAP[object_type]
for t in tag_list.keys():
tag = queue.get(t, None)
tag = data.get(t, None)
if tag is not None:
tags.append('rabbitmq_%s:%s' % (tag_list[t], tag))

else:
tags = None

for attribute in QUEUE_ATTRIBUTES:
value = queue.get(attribute, None)
for attribute in ATTRIBUTES[object_type]:
value = data.get(attribute, None)
if value is not None:
if send_histogram:
self.histogram('rabbitmq.queue.%s.hist' % attribute, int(value))
if is_gauge:
self.gauge('rabbitmq.queue.%s' % attribute, int(value), tags=tags)


def _get_metrics_for_node(self, node, is_gauge=False, send_histogram=True):
if is_gauge:
tags = []
if 'name' in node:
tags.append('rabbitmq_node:%s' % node['name'])
self.histogram('rabbitmq.%s.%s.hist' % (METRIC_SUFFIX[object_type], attribute), int(value))
if detailed:
self.gauge('rabbitmq.%s.%s' % (METRIC_SUFFIX[object_type], attribute), int(value), tags=tags)

def alert(self, base_url, max_detailed, size, object_type):
key = "%s%s" % (base_url, object_type)
if key in self.already_alerted:
# We already posted an event
return

self.already_alerted.append(key)

title = "RabbitMQ integration is approaching the limit on %s" % self.hostname
msg = """%s %s are present. The limit is %s.
Please get in touch with Datadog support to increase the limit.""" % (size, object_type, max_detailed)

event = {
"timestamp": int(time.time()),
"event_type": EVENT_TYPE,
"api_key": self.agentConfig['api_key'],
"msg_title": title,
"msg_text": msg,
"alert_type": 'warning',
"source_type_name": SOURCE_TYPE_NAME,
"host": self.hostname,
"tags": ["base_url:%s" % base_url, "host:%s" % self.hostname],
"event_object": key,
}

self.event(event)






for attribute in NODE_ATTRIBUTES:
value = node.get(attribute, None)
if value is not None:
if send_histogram:
self.histogram('rabbitmq.node.%s.hist' % attribute, int(value))
if is_gauge:
self.gauge('rabbitmq.node.%s' % attribute, int(value), tags=tags)


def get_queue_stats(self, instance, base_url):
url = urlparse.urljoin(base_url, 'queues')
queues = self._get_data(url)

if len(queues) > 100 and not instance.get('queues', None):
self.warning("Too many queues to fetch. You must choose the queues you are interested in by editing the rabbitmq.yaml configuration file")

allowed_queues = instance.get('queues', [])
if len(allowed_queues) > MAX_QUEUES:
raise Exception("The maximum number of queues you can specify is %d." % MAX_QUEUES)

if not allowed_queues:
allowed_queues = [q.get('name') for q in queues[:MAX_QUEUES]]
# If no queues are specified in the config, we only get metrics for the 5 first ones.
# Others will be aggregated

i = 0
queue_Limit_reached = False
for queue in queues:
name = queue.get('name')
if name in allowed_queues:
self._get_metrics_for_queue(queue, is_gauge=True, send_histogram=len(queues) > MAX_QUEUES)
allowed_queues.remove(name)
elif queue_Limit_reached:
if not allowed_queues:
# We have reached the limit and we have already processed the config specified queues
break
# We have reached the limit but some queues specified in the config still haven't been processed
continue
else:
self._get_metrics_for_queue(queue)

i += 1
if i > QUEUE_LIMIT:
self.warning("More than %s queues are present. Only collecting data using the 100 first" % QUEUE_LIMIT)
queue_Limit_reached = True


def get_node_stats(self, instance, base_url):
url = urlparse.urljoin(base_url, 'nodes')
nodes = self._get_data(url)

if len(nodes) > 100 and not instance.get('nodes', None):
self.warning("Too many queues to fetch. You must choose the queues you are interested in by editing the rabbitmq.yaml configuration file")

allowed_nodes = instance.get('nodes', [])
if len(allowed_nodes) > MAX_NODES:
raise Exception("The maximum number of nodes you can specify is %d." % MAX_NODES)

if not allowed_nodes:
allowed_nodes = [n.get('name') for n in nodes[:MAX_NODES]]
# If no nodes are specified in the config, we only get metrics for the 5 first ones.
# Others will be aggregated

i = 0
node_limit_reached = False
for node in nodes:
name = node.get('name')
if name in allowed_nodes:
self._get_metrics_for_node(node, is_gauge=True, send_histogram=len(nodes) > MAX_NODES)
allowed_nodes.remove(name)
elif node_limit_reached:
if not allowed_nodes:
# We have reached the limit and we have already processed the config specified nodes
break
# We have reached the limit but some nodes specified in the config still haven't been processed
continue
else:
self._get_metrics_for_node(node)

i += 1
if i > NODE_LIMIT:
self.warning("More than %s nodes are present. Only collecting data using the 100 first" % NODE_LIMIT)
node_limit_reached = True

0 comments on commit 51dd72b

Please sign in to comment.