diff --git a/checks.d/mongo.py b/checks.d/mongo.py new file mode 100644 index 0000000000..9ee528c966 --- /dev/null +++ b/checks.d/mongo.py @@ -0,0 +1,234 @@ +import re +import types +import time +from datetime import datetime + +from checks import AgentCheck +from util import get_hostname + +# When running with pymongo < 2.0 +# Not the full spec for mongo URIs -- just extract username and password +# http://www.mongodb.org/display/DOCS/connections6 +mongo_uri_re=re.compile(r'mongodb://(?P[^:@]+):(?P[^:@]+)@.*') + +class MongoDb(AgentCheck): + + GAUGES = [ + "indexCounters.btree.missRatio", + "globalLock.ratio", + "connections.current", + "connections.available", + "mem.resident", + "mem.virtual", + "mem.mapped", + "cursors.totalOpen", + "cursors.timedOut", + "uptime", + + "stats.indexes", + "stats.indexSize", + "stats.objects", + "stats.dataSize", + "stats.storageSize", + + "replSet.health", + "replSet.state", + "replSet.replicationLag" + ] + + RATES = [ + "indexCounters.btree.accesses", + "indexCounters.btree.hits", + "indexCounters.btree.misses", + "opcounters.insert", + "opcounters.query", + "opcounters.update", + "opcounters.delete", + "opcounters.getmore", + "opcounters.command", + "asserts.regular", + "asserts.warning", + "asserts.msg", + "asserts.user", + "asserts.rollovers" + ] + + METRICS = GAUGES + RATES + + def __init__(self, name, init_config, agentConfig): + AgentCheck.__init__(self, name, init_config, agentConfig) + + self._last_state = -1 + + def checkLastState(self, state, agentConfig): + if self._last_state != state: + self._last_state = state + return self.create_event(state, agentConfig) + + def create_event(self, state, agentConfig): + """Create an event with a message describing the replication + state of a mongo node""" + + def get_state_description(state): + if state == 0: return 'Starting Up' + elif state == 1: return 'Primary' + elif state == 2: return 'Secondary' + elif state == 3: return 'Recovering' + elif state == 4: return 'Fatal' + elif state == 5: return 'Starting up (forking threads)' + elif state == 6: return 'Unknown' + elif state == 7: return 'Arbiter' + elif state == 8: return 'Down' + elif state == 9: return 'Rollback' + + status = get_state_description(state) + hostname = get_hostname(agentConfig) + msg_title = "%s is %s" % (hostname, status) + msg = "MongoDB: %s just reported as %s" % (hostname, status) + + self.event({ + 'timestamp': int(time.mktime(datetime.now().timetuple())), + 'event_type': 'Mongo', + 'api_key': agentConfig['api_key'], + 'msg_title': msg_title, + 'msg_text': msg, + 'host': hostname + }) + + def check(self, instance): + """ + Returns a dictionary that looks a lot like what's sent back by db.serverStatus() + """ + if 'server' not in instance: + self.log.warn("Missing 'server' in mongo config") + return + + tags = instance.get('tags', []) + + try: + from pymongo import Connection + except ImportError: + self.log.error('mongo.yaml exists but pymongo module can not be imported. Skipping check.') + raise Exception('Python PyMongo Module can not be imported. Please check the installation instruction on the Datadog Website') + + try: + from pymongo import uri_parser + # Configuration a URL, mongodb://user:pass@server/db + parsed = uri_parser.parse_uri(instance['server']) + except ImportError: + # uri_parser is pymongo 2.0+ + matches = mongo_uri_re.match(instance['server']) + if matches: + parsed = matches.groupdict() + else: + parsed = {} + username = parsed.get('username') + password = parsed.get('password') + + do_auth = True + if username is None or password is None: + self.log.debug("Mongo: cannot extract username and password from config %s" % instance['server']) + do_auth = False + + conn = Connection(instance['server']) + db = conn['admin'] + if do_auth: + if not db.authenticate(username, password): + self.log.error("Mongo: cannot connect with config %s" % instance['server']) + + status = db.command('serverStatus') # Shorthand for {'serverStatus': 1} + status['stats'] = db.command('dbstats') + + results = {} + + # Handle replica data, if any + # See http://www.mongodb.org/display/DOCS/Replica+Set+Commands#ReplicaSetCommands-replSetGetStatus + try: + data = {} + + replSet = db.command('replSetGetStatus') + if replSet: + primary = None + current = None + + # find nodes: master and current node (ourself) + for member in replSet.get('members'): + if member.get('self'): + current = member + if int(member.get('state')) == 1: + primary = member + + # If we have both we can compute a lag time + if current is not None and primary is not None: + lag = current['optimeDate'] - primary['optimeDate'] + # Python 2.7 has this built in, python < 2.7 don't... + if hasattr(lag,'total_seconds'): + data['replicationLag'] = lag.total_seconds() + else: + data['replicationLag'] = (lag.microseconds + \ + (lag.seconds + lag.days * 24 * 3600) * 10**6) / 10.0**6 + + if current is not None: + data['health'] = current['health'] + + data['state'] = replSet['myState'] + self.checkLastState(data['state'], self.agentConfig) + status['replSet'] = data + except Exception, e: + + from pymongo.errors import OperationFailure + + if type(e) == OperationFailure and "replSetGetStatus" in str(e): + pass + else: + raise e + + # If these keys exist, remove them for now as they cannot be serialized + try: + status['backgroundFlushing'].pop('last_finished') + except KeyError: + pass + try: + status.pop('localTime') + except KeyError: + pass + + # Go through the metrics and save the values + for m in self.METRICS: + # each metric is of the form: x.y.z with z optional + # and can be found at status[x][y][z] + value = status + try: + for c in m.split("."): + value = value[c] + except KeyError: + continue + + # value is now status[x][y][z] + assert type(value) in (types.IntType, types.LongType, types.FloatType) + + # Check if metric is a gauge or rate + if m in self.GAUGES: + m = self.normalize(m.lower(), 'mongodb') + self.gauge(m, value, tags=tags) + + if m in self.RATES: + m = self.normalize(m.lower(), 'mongodb') + "ps" + self.rate(m, value, tags=tags) + + @staticmethod + def parse_agent_config(agentConfig): + if not agentConfig.get('mongodb_server'): + return False + + return { + 'instances': [{ + 'server': agentConfig.get('mongodb_server') + }] + } + +if __name__ == "__main__": + check, instances = MongoDb.from_yaml('conf.d/mongo.yaml') + for instance in instances: + check.check(instance) + print check.get_metrics() diff --git a/checks/collector.py b/checks/collector.py index 42df2f26ef..b0f0e7546a 100644 --- a/checks/collector.py +++ b/checks/collector.py @@ -19,7 +19,6 @@ from checks.nagios import Nagios from checks.build import Hudson from checks.db.mysql import MySql -from checks.db.mongo import MongoDb from checks.db.mcache import Memcache from checks.queue import RabbitMq from checks.ganglia import Ganglia @@ -77,7 +76,6 @@ def __init__(self, agentConfig, emitters, systemStats): } # Old-style metric checks - self._mongodb = MongoDb(log) self._mysql = MySql(log) self._rabbitmq = RabbitMq() self._ganglia = Ganglia(log) @@ -199,7 +197,6 @@ def run(self, checksd=None, start_event=True): # Run old-style checks mysqlStatus = self._mysql.check(self.agentConfig) rabbitmq = self._rabbitmq.check(log, self.agentConfig) - mongodb = self._mongodb.check(self.agentConfig) gangliaData = self._ganglia.check(self.agentConfig) cassandraData = self._cassandra.check(log, self.agentConfig) dogstreamData = self._dogstream.check(self.agentConfig) @@ -218,13 +215,6 @@ def run(self, checksd=None, start_event=True): # RabbitMQ if rabbitmq: payload['rabbitMQ'] = rabbitmq - - # MongoDB - if mongodb: - if mongodb.has_key('events'): - events['Mongo'] = mongodb['events']['Mongo'] - del mongodb['events'] - payload['mongoDB'] = mongodb # dogstream if dogstreamData: diff --git a/checks/db/mongo.py b/checks/db/mongo.py deleted file mode 100644 index 3f934a2a95..0000000000 --- a/checks/db/mongo.py +++ /dev/null @@ -1,225 +0,0 @@ -import re -import types -from datetime import datetime - -from checks import * -from util import get_hostname - -# When running with pymongo < 2.0 -# Not the full spec for mongo URIs -- just extract username and password -# http://www.mongodb.org/display/DOCS/connections6 -mongo_uri_re=re.compile(r'mongodb://(?P[^:@]+):(?P[^:@]+)@.*') - -class MongoDb(Check): - - def __init__(self, logger): - - Check.__init__(self, logger) - - self._last_state = -1 - - self.counter("indexCounters.btree.accesses") - self.counter("indexCounters.btree.hits") - self.counter("indexCounters.btree.misses") - self.gauge("indexCounters.btree.missRatio") - self.counter("opcounters.insert") - self.counter("opcounters.query") - self.counter("opcounters.update") - self.counter("opcounters.delete") - self.counter("opcounters.getmore") - self.counter("opcounters.command") - self.counter("asserts.regular") - self.counter("asserts.warning") - self.counter("asserts.msg") - self.counter("asserts.user") - self.counter("asserts.rollovers") - self.gauge("globalLock.ratio") - self.gauge("connections.current") - self.gauge("connections.available") - self.gauge("mem.resident") - self.gauge("mem.virtual") - self.gauge("mem.mapped") - self.gauge("cursors.totalOpen") - self.gauge("cursors.timedOut") - self.gauge("uptime") - - self.gauge("stats.indexes") - self.gauge("stats.indexSize") - self.gauge("stats.objects") - self.gauge("stats.dataSize") - self.gauge("stats.storageSize") - - self.gauge("replSet.health") - self.gauge("replSet.state") - self.gauge("replSet.replicationLag") - - def checkLastState(self, state, agentConfig, serverVersion): - if self._last_state != state: - self._last_state = state - return self.create_event(state, agentConfig, serverVersion) - - def create_event(self, state, agentConfig, serverVersion): - """Create an event with a message describing the replication - state of a mongo node""" - - def get_state_description(state): - if state == 0: return 'Starting Up' - elif state == 1: return 'Primary' - elif state == 2: return 'Secondary' - elif state == 3: return 'Recovering' - elif state == 4: return 'Fatal' - elif state == 5: return 'Starting up (forking threads)' - elif state == 6: return 'Unknown' - elif state == 7: return 'Arbiter' - elif state == 8: return 'Down' - elif state == 9: return 'Rollback' - - return { 'timestamp': int(time.mktime(datetime.now().timetuple())), - 'event_type': 'Mongo', - 'host': get_hostname(agentConfig), - 'api_key': agentConfig['api_key'], - 'version': serverVersion, - 'state': get_state_description(state) } - - def check(self, agentConfig): - """ - Returns a dictionary that looks a lot like what's sent back by db.serverStatus() - """ - - if 'mongodb_server' not in agentConfig or agentConfig['mongodb_server'] == '': - return False - - try: - from pymongo import Connection - try: - from pymongo import uri_parser - # Configuration a URL, mongodb://user:pass@server/db - parsed = uri_parser.parse_uri(agentConfig['mongodb_server']) - except ImportError: - self.logger.debug('Mongo: running with pymongo < 2.0, custom uri parsing') - # uri_parser is pymongo 2.0+ - matches = mongo_uri_re.match(agentConfig['mongodb_server']) - if matches: - parsed = matches.groupdict() - else: - parsed = {} - username = parsed.get('username') - password = parsed.get('password') - - do_auth = True - if username is None or password is None: - self.logger.debug("Mongo: cannot extract username and password from config %s" % agentConfig['mongodb_server']) - do_auth = False - - conn = Connection(agentConfig['mongodb_server']) - db = conn['admin'] - if do_auth: - if not db.authenticate(username, password): - self.logger.error("Mongo: cannot connect with config %s" % agentConfig['mongodb_server']) - - status = db.command('serverStatus') # Shorthand for {'serverStatus': 1} - status['stats'] = db.command('dbstats') - - results = {} - - # Handle replica data, if any - # See http://www.mongodb.org/display/DOCS/Replica+Set+Commands#ReplicaSetCommands-replSetGetStatus - try: - data = {} - - replSet = db.command('replSetGetStatus') - serverVersion = conn.server_info()['version'] - if replSet: - primary = None - current = None - - # find nodes: master and current node (ourself) - for member in replSet.get('members'): - if member.get('self'): - current = member - if int(member.get('state')) == 1: - primary = member - - # If we have both we can compute a lag time - if current is not None and primary is not None: - lag = current['optimeDate'] - primary['optimeDate'] - # Python 2.7 has this built in, python < 2.7 don't... - if hasattr(lag,'total_seconds'): - data['replicationLag'] = lag.total_seconds() - else: - data['replicationLag'] = (lag.microseconds + \ - (lag.seconds + lag.days * 24 * 3600) * 10**6) / 10.0**6 - - if current is not None: - data['health'] = current['health'] - - data['state'] = replSet['myState'] - event = self.checkLastState(data['state'], agentConfig, serverVersion) - if event is not None: - results['events'] = {'Mongo': [event]} - status['replSet'] = data - except: - self.logger.debug("Cannot determine replication set status", exc_info=True) - - # If these keys exist, remove them for now as they cannot be serialized - try: - status['backgroundFlushing'].pop('last_finished') - except KeyError: - pass - try: - status.pop('localTime') - except KeyError: - pass - - # Flatten the metrics first - # Collect samples - # Send a dictionary back - - for m in self.get_metric_names(): - # each metric is of the form: x.y.z with z optional - # and can be found at status[x][y][z] - value = status - try: - for c in m.split("."): - value = value[c] - except KeyError: - continue - - # value is now status[x][y][z] - assert type(value) in (types.IntType, types.LongType, types.FloatType) - - self.save_sample(m, value) - - # opposite op: x.y.z -> results[x][y][zPS], yes, ...PS for counters - try: - val = self.get_sample(m) - r = results - for c in m.split(".")[:-1]: - if c not in r: - r[c] = {} - r = r[c] - if self.is_counter(m): - suffix = m.split(".")[-1] + "PS" - else: - suffix = m.split(".")[-1] - r[suffix] = val - - except UnknownValue: - pass - - return results - - except ImportError: - self.logger.exception('Unable to import pymongo library') - return False - - except: - self.logger.exception('Unable to get MongoDB status') - return False - -if __name__ == "__main__": - import logging - agentConfig = { 'mongodb_server': 'localhost:27017', 'api_key': 'toto' } - db = MongoDb(logging) - print db.check(agentConfig) - diff --git a/conf.d/mongo.yaml.example b/conf.d/mongo.yaml.example new file mode 100644 index 0000000000..01b16b0cff --- /dev/null +++ b/conf.d/mongo.yaml.example @@ -0,0 +1,7 @@ +init_config: + +instances: +# - server: mongodb://localhost:27017 +# tags: +# - optional_tag1 +# - optional_tag2 \ No newline at end of file diff --git a/tests/test_mongo.py b/tests/test_mongo.py index 2b6fc846af..fd08d8e31d 100644 --- a/tests/test_mongo.py +++ b/tests/test_mongo.py @@ -6,7 +6,8 @@ import socket import pymongo -from checks.db.mongo import MongoDb + +from tests.common import load_check PORT1 = 37017 PORT2 = 37018 @@ -26,9 +27,16 @@ def wait4mongo(self, process, port): loop += 1 if loop >= MAX_WAIT: break - + def setUp(self): - self.c = MongoDb(logging.getLogger(__file__)) + self.agentConfig = { + 'version': '0.1', + 'api_key': 'toto' + } + + # Initialize the check from checks.d + self.check = load_check('mongo', {'init_config': {}, 'instances': {}}, self.agentConfig) + # Start 2 instances of Mongo in a replica set dir1 = mkdtemp() dir2 = mkdtemp() @@ -64,24 +72,148 @@ def tearDown(self): except: logging.getLogger().exception("Cannot terminate mongod instances") - def testCheck(self): - r = self.c.check({"mongodb_server": "mongodb://localhost:%s/test" % PORT1, "api_key": "abc123"}) - self.assertEquals(r and r["connections"]["current"] >= 1, True) - assert r["connections"]["available"] >= 1 - assert r["uptime"] >= 0, r - assert r["mem"]["resident"] > 0 - assert r["mem"]["virtual"] > 0 - assert "replSet" in r - - r = self.c.check({"mongodb_server": "mongodb://localhost:%s/test" % PORT2, "api_key": "abc123"}) - self.assertEquals(r and r["connections"]["current"] >= 1, True) - assert r["connections"]["available"] >= 1 - assert r["uptime"] >= 0, r - assert r["mem"]["resident"] > 0 - assert r["mem"]["virtual"] > 0 - assert "replSet" in r - + def testMongoCheck(self): + + self.config = { + 'instances': [{ + 'server': "mongodb://localhost:%s/test" % PORT1 + }, + { + 'server': "mongodb://localhost:%s/test" % PORT2 + }] + } + + # Test mongodb with checks.d + self.check = load_check('mongo', self.config, self.agentConfig) + + # Run the check against our running server + self.check.check(self.config['instances'][0]) + # Sleep for 1 second so the rate interval >=1 + time.sleep(1) + # Run the check again so we get the rates + self.check.check(self.config['instances'][0]) + + # Metric assertions + metrics = self.check.get_metrics() + assert metrics + self.assertTrue(type(metrics) == type([])) + self.assertTrue(len(metrics) > 0) + + metric_val_checks = { + 'mongodb.connections.current': lambda x: x >= 1, + 'mongodb.connections.available': lambda x: x >= 1, + 'mongodb.uptime': lambda x: x >= 0, + 'mongodb.mem.resident': lambda x: x > 0, + 'mongodb.mem.virtual': lambda x: x > 0 + } + + replSetCheck = False + for m in metrics: + metric_name = m[0] + if "replset" in metric_name.split("."): + replSetCheck = True + if metric_name in metric_val_checks: + self.assertTrue( metric_val_checks[metric_name]( m[2] ) ) + + self.assertTrue( replSetCheck ) + + # Run the check against our running server + self.check.check(self.config['instances'][1]) + # Sleep for 1 second so the rate interval >=1 + time.sleep(1) + # Run the check again so we get the rates + self.check.check(self.config['instances'][1]) + + # Metric assertions + metrics = self.check.get_metrics() + assert metrics + self.assertTrue(type(metrics) == type([])) + self.assertTrue(len(metrics) > 0) + + replSetCheck = False + for m in metrics: + metric_name = m[0] + if "replset" in metric_name.split("."): + replSetCheck = True + if metric_name in metric_val_checks: + self.assertTrue( metric_val_checks[metric_name]( m[2] ) ) + + self.assertTrue( replSetCheck ) + + def testMongoOldConfig(self): + self.agentConfig1 = { + 'mongodb_server': "mongodb://localhost:%s/test" % PORT1, + 'version': '0.1', + 'api_key': 'toto' + } + conf1 = self.check.parse_agent_config(self.agentConfig1) + self.agentConfig2 = { + 'mongodb_server': "mongodb://localhost:%s/test" % PORT2, + 'version': '0.1', + 'api_key': 'toto' + } + conf2 = self.check.parse_agent_config(self.agentConfig2) + + # Test the first mongodb instance + self.check = load_check('mongo', conf1, self.agentConfig1) + + # Run the check against our running server + self.check.check(conf1['instances'][0]) + # Sleep for 1 second so the rate interval >=1 + time.sleep(1) + # Run the check again so we get the rates + self.check.check(conf1['instances'][0]) + + # Metric assertions + metrics = self.check.get_metrics() + assert metrics + self.assertTrue(type(metrics) == type([])) + self.assertTrue(len(metrics) > 0) + + metric_val_checks = { + 'mongodb.connections.current': lambda x: x >= 1, + 'mongodb.connections.available': lambda x: x >= 1, + 'mongodb.uptime': lambda x: x >= 0, + 'mongodb.mem.resident': lambda x: x > 0, + 'mongodb.mem.virtual': lambda x: x > 0 + } + + replSetCheck = False + for m in metrics: + metric_name = m[0] + if "replset" in metric_name.split("."): + replSetCheck = True + if metric_name in metric_val_checks: + self.assertTrue( metric_val_checks[metric_name]( m[2] ) ) + + self.assertTrue( replSetCheck ) + + + # Test the second mongodb instance + self.check = load_check('mongo', conf2, self.agentConfig2) + + # Run the check against our running server + self.check.check(conf2['instances'][0]) + # Sleep for 1 second so the rate interval >=1 + time.sleep(1) + # Run the check again so we get the rates + self.check.check(conf2['instances'][0]) + + # Metric assertions + metrics = self.check.get_metrics() + assert metrics + self.assertTrue(type(metrics) == type([])) + self.assertTrue(len(metrics) > 0) + + replSetCheck = False + for m in metrics: + metric_name = m[0] + if "replset" in metric_name.split("."): + replSetCheck = True + if metric_name in metric_val_checks: + self.assertTrue( metric_val_checks[metric_name]( m[2] ) ) + + self.assertTrue( replSetCheck ) if __name__ == '__main__': unittest.main() -