diff --git a/checks.d/mongo.py b/checks.d/mongo.py index 5002f2013f..4648f2f26f 100644 --- a/checks.d/mongo.py +++ b/checks.d/mongo.py @@ -50,6 +50,12 @@ class MongoDb(AgentCheck): "opcounters.delete", "opcounters.getmore", "opcounters.command", + "opcountersRepl.insert", + "opcountersRepl.query", + "opcountersRepl.update", + "opcountersRepl.delete", + "opcountersRepl.getmore", + "opcountersRepl.command", "asserts.regular", "asserts.warning", "asserts.msg", @@ -220,7 +226,7 @@ def check(self, instance): # If we have both we can compute a lag time if current is not None and primary is not None: - lag = current['optimeDate'] - primary['optimeDate'] + lag = primary['optimeDate'] - current['optimeDate'] # Python 2.7 has this built in, python < 2.7 don't... if hasattr(lag,'total_seconds'): data['replicationLag'] = lag.total_seconds() diff --git a/checks.d/tokumx.py b/checks.d/tokumx.py new file mode 100644 index 0000000000..7ec4674ffb --- /dev/null +++ b/checks.d/tokumx.py @@ -0,0 +1,409 @@ +import re +import types +import time + +from checks import AgentCheck +from util import get_hostname + +# When running with pymongo < 2.0 +# Not the full spec for mongo URIs -- just extract username and password +# http://www.mongodb.org/display/DOCS/connections6 +mongo_uri_re=re.compile(r'mongodb://(?P[^:@]+):(?P[^:@]+)@.*') + +DEFAULT_TIMEOUT = 10 + +class TokuMX(AgentCheck): + + GAUGES = [ + "indexCounters.btree.missRatio", + "globalLock.ratio", + "connections.current", + "connections.available", + "mem.resident", + "mem.virtual", + "mem.mapped", + "cursors.totalOpen", + "cursors.timedOut", + "uptime", + + "stats.indexes", + "stats.indexSize", + "stats.objects", + "stats.dataSize", + "stats.storageSize", + + "replSet.health", + "replSet.state", + "replSet.replicationLag", + "metrics.repl.buffer.count", + "metrics.repl.buffer.maxSizeBytes", + "metrics.repl.buffer.sizeBytes", + + "ft.cachetable.size.current", + "ft.cachetable.size.writing", + "ft.cachetable.size.limit", + "ft.locktree.size.current", + "ft.locktree.size.limit", + "ft.compressionRatio.leaf", + "ft.compressionRatio.nonleaf", + "ft.compressionRatio.overall", + "ft.checkpoint.lastComplete.time", + + "ft.alerts.locktreeRequestsPending", + "ft.alerts.checkpointFailures", + ] + + RATES = [ + "indexCounters.btree.accesses", + "indexCounters.btree.hits", + "indexCounters.btree.misses", + "opcounters.insert", + "opcounters.query", + "opcounters.update", + "opcounters.delete", + "opcounters.getmore", + "opcounters.command", + "opcountersRepl.insert", + "opcountersRepl.query", + "opcountersRepl.update", + "opcountersRepl.delete", + "opcountersRepl.getmore", + "opcountersRepl.command", + "asserts.regular", + "asserts.warning", + "asserts.msg", + "asserts.user", + "asserts.rollovers", + "metrics.document.deleted", + "metrics.document.inserted", + "metrics.document.returned", + "metrics.document.updated", + "metrics.getLastError.wtime.num", + "metrics.getLastError.wtime.totalMillis", + "metrics.getLastError.wtimeouts", + "metrics.operation.fastmod", + "metrics.operation.idhack", + "metrics.operation.scanAndOrder", + "metrics.queryExecutor.scanned", + "metrics.record.moves", + "metrics.repl.apply.batches.num", + "metrics.repl.apply.batches.totalMillis", + "metrics.repl.apply.ops", + "metrics.repl.network.bytes", + "metrics.repl.network.getmores.num", + "metrics.repl.network.getmores.totalMillis", + "metrics.repl.network.ops", + "metrics.repl.network.readersCreated", + "metrics.repl.oplog.insert.num", + "metrics.repl.oplog.insert.totalMillis", + "metrics.repl.oplog.insertBytes", + "metrics.ttl.deletedDocuments", + "metrics.ttl.passes", + + "ft.fsync.count", + "ft.fsync.time", + "ft.log.count", + "ft.log.time", + "ft.log.bytes", + "ft.cachetable.miss.count", + "ft.cachetable.miss.time", + "ft.cachetable.miss.full.count", + "ft.cachetable.miss.full.time", + "ft.cachetable.miss.partial.count", + "ft.cachetable.miss.partial.time", + "ft.cachetable.evictions.partial.nonleaf.clean.count", + "ft.cachetable.evictions.partial.nonleaf.clean.bytes", + "ft.cachetable.evictions.partial.leaf.clean.count", + "ft.cachetable.evictions.partial.leaf.clean.bytes", + "ft.cachetable.evictions.full.nonleaf.clean.count", + "ft.cachetable.evictions.full.nonleaf.clean.bytes", + "ft.cachetable.evictions.full.nonleaf.dirty.count", + "ft.cachetable.evictions.full.nonleaf.dirty.bytes", + "ft.cachetable.evictions.full.nonleaf.dirty.time", + "ft.cachetable.evictions.full.leaf.clean.count", + "ft.cachetable.evictions.full.leaf.clean.bytes", + "ft.cachetable.evictions.full.leaf.dirty.count", + "ft.cachetable.evictions.full.leaf.dirty.bytes", + "ft.cachetable.evictions.full.leaf.dirty.time", + "ft.checkpoint.count", + "ft.checkpoint.time", + "ft.checkpoint.begin.time", + "ft.checkpoint.write.nonleaf.count", + "ft.checkpoint.write.nonleaf.time", + "ft.checkpoint.write.nonleaf.bytes.uncompressed", + "ft.checkpoint.write.nonleaf.bytes.compressed", + "ft.checkpoint.write.leaf.count", + "ft.checkpoint.write.leaf.time", + "ft.checkpoint.write.leaf.bytes.uncompressed", + "ft.checkpoint.write.leaf.bytes.compressed", + "ft.serializeTime.nonleaf.serialize", + "ft.serializeTime.nonleaf.compress", + "ft.serializeTime.nonleaf.decompress", + "ft.serializeTime.nonleaf.deserialize", + "ft.serializeTime.leaf.serialize", + "ft.serializeTime.leaf.compress", + "ft.serializeTime.leaf.decompress", + "ft.serializeTime.leaf.deserialize", + + "ft.alerts.longWaitEvents.logBufferWait", + "ft.alerts.longWaitEvents.fsync.count", + "ft.alerts.longWaitEvents.fsync.time", + "ft.alerts.longWaitEvents.cachePressure.count", + "ft.alerts.longWaitEvents.cachePressure.time", + "ft.alerts.longWaitEvents.checkpointBegin.count", + "ft.alerts.longWaitEvents.checkpointBegin.time", + "ft.alerts.longWaitEvents.locktreeWait.count", + "ft.alerts.longWaitEvents.locktreeWait.time", + "ft.alerts.longWaitEvents.locktreeWaitEscalation.count", + "ft.alerts.longWaitEvents.locktreeWaitEscalation.time", + ] + + METRICS = GAUGES + RATES + + def __init__(self, name, init_config, agentConfig): + AgentCheck.__init__(self, name, init_config, agentConfig) + self._last_state_by_server = {} + + def get_library_versions(self): + try: + import pymongo + version = pymongo.version + except ImportError: + version = "Not Found" + except AttributeError: + version = "Unknown" + + return {"pymongo": version} + + def check_last_state(self, state, server, agentConfig): + if self._last_state_by_server.get(server, -1) != state: + self._last_state_by_server[server] = state + return self.create_event(state, server, agentConfig) + + def create_event(self, state, server, agentConfig): + """Create an event with a message describing the replication + state of a mongo node""" + + def get_state_description(state): + if state == 0: return 'Starting Up' + elif state == 1: return 'Primary' + elif state == 2: return 'Secondary' + elif state == 3: return 'Recovering' + elif state == 4: return 'Fatal' + elif state == 5: return 'Starting up (initial sync)' + elif state == 6: return 'Unknown' + elif state == 7: return 'Arbiter' + elif state == 8: return 'Down' + elif state == 9: return 'Rollback' + + status = get_state_description(state) + hostname = get_hostname(agentConfig) + msg_title = "%s is %s" % (server, status) + msg = "TokuMX %s just reported as %s" % (server, status) + + self.event({ + 'timestamp': int(time.time()), + 'event_type': 'tokumx', + 'api_key': agentConfig['api_key'], + 'msg_title': msg_title, + 'msg_text': msg, + 'host': hostname + }) + + def check(self, instance): + """ + Returns a dictionary that looks a lot like what's sent back by db.serverStatus() + """ + if 'server' not in instance: + self.log.warn("Missing 'server' in tokumx config") + return + + server = instance['server'] + + ssl_params = { + 'ssl': instance.get('ssl', None), + 'ssl_keyfile': instance.get('ssl_keyfile', None), + 'ssl_certfile': instance.get('ssl_certfile', None), + 'ssl_cert_reqs': instance.get('ssl_cert_reqs', None), + 'ssl_ca_certs': instance.get('ssl_ca_certs', None) + } + + for key, param in ssl_params.items(): + if param is None: + del ssl_params[key] + + tags = instance.get('tags', []) + tags.append('server:%s' % server) + # de-dupe tags to avoid a memory leak + tags = list(set(tags)) + + try: + from pymongo import MongoClient, ReadPreference + except ImportError: + self.log.error('tokumx.yaml exists but pymongo module can not be imported. Skipping check.') + raise Exception('Python PyMongo Module can not be imported. Please check the installation instruction on the Datadog Website') + + try: + from pymongo import uri_parser + # Configuration a URL, mongodb://user:pass@server/db + parsed = uri_parser.parse_uri(server) + except ImportError: + # uri_parser is pymongo 2.0+ + matches = mongo_uri_re.match(server) + if matches: + parsed = matches.groupdict() + else: + parsed = {} + username = parsed.get('username') + password = parsed.get('password') + db_name = parsed.get('database') + + if not db_name: + self.log.info('No TokuMX database found in URI. Defaulting to admin.') + db_name = 'admin' + + do_auth = True + if username is None or password is None: + self.log.debug("TokuMX: cannot extract username and password from config %s" % server) + do_auth = False + + conn = MongoClient(server, socketTimeoutMS=DEFAULT_TIMEOUT*1000, **ssl_params) + db = conn[db_name] + if do_auth: + if not db.authenticate(username, password): + self.log.error("TokuMX: cannot connect with config %s" % server) + + if conn.is_mongos: + tags.append('role:mongos') + config = conn['config'] + agg_result = config['chunks'].aggregate([{'$group': {'_id': {'ns': '$ns', 'shard': '$shard'}, 'count': {'$sum': 1}}}]) + if agg_result['ok']: + for doc in agg_result['result']: + chunk_tags = list(tags) + parts = doc['_id']['ns'].split('.', 1) + chunk_tags.append('db:%s' % parts[0]) + chunk_tags.append('coll:%s' % parts[1]) + chunk_tags.append('shard:%s' % doc['_id']['shard']) + shard_doc = config['shards'].find_one(doc['_id']['shard']) + host_parts = shard_doc['host'].split('/', 1) + if len(host_parts) == 2: + chunk_tags.append('replset:%s' % host_parts[0]) + self.gauge('tokumx.sharding.chunks', doc['count'], tags=chunk_tags) + else: + status = db["$cmd"].find_one({"serverStatus": 1}) + status['stats'] = db.command('dbstats') + + # Handle replica data, if any + # See http://www.mongodb.org/display/DOCS/Replica+Set+Commands#ReplicaSetCommands-replSetGetStatus + try: + data = {} + + replSet = conn['admin'].command('replSetGetStatus') + if replSet: + primary = None + current = None + + # find nodes: master and current node (ourself) + for member in replSet.get('members'): + if member.get('self'): + current = member + if int(member.get('state')) == 1: + primary = member + + # If we have both we can compute a lag time + if current is not None and primary is not None: + lag = primary['optimeDate'] - current['optimeDate'] + # Python 2.7 has this built in, python < 2.7 don't... + if hasattr(lag,'total_seconds'): + data['replicationLag'] = lag.total_seconds() + else: + data['replicationLag'] = (lag.microseconds + \ + (lag.seconds + lag.days * 24 * 3600) * 10**6) / 10.0**6 + + if current is not None: + data['health'] = current['health'] + + tags.append('replset:%s' % replSet['set']) + tags.append('replstate:%s' % current['stateStr']) + if current['stateStr'] == 'PRIMARY': + tags.append('role:primary') + else: + tags.append('role:secondary') + conn.read_preference = ReadPreference.SECONDARY + + data['state'] = replSet['myState'] + self.check_last_state(data['state'], server, self.agentConfig) + status['replSet'] = data + except Exception, e: + if "OperationFailure" in repr(e) and "replSetGetStatus" in str(e): + pass + else: + raise e + + for dbname in conn.database_names(): + db_tags = list(tags) + db_tags.append('db:%s' % dbname) + db = conn[dbname] + stats = db.command('dbstats') + for m, v in stats.items(): + if m in ['db', 'ok']: + continue + m = 'stats.db.%s' % m + m = self.normalize(m, 'tokumx') + self.gauge(m, v, db_tags) + for collname in db.collection_names(False): + coll_tags = list(db_tags) + coll_tags.append('coll:%s' % collname) + stats = db.command('collStats', collname) + for m, v in stats.items(): + if m in ['db', 'ok']: + continue + if m == 'indexDetails': + for idx_stats in v: + idx_tags = list(coll_tags) + idx_tags.append('idx:%s' % idx_stats['name']) + for k in ['count', 'size', 'avgObjSize', 'storageSize']: + mname = 'stats.idx.%s' % k + mname = self.normalize(mname, 'tokumx') + self.gauge(mname, idx_stats[k], tags=idx_tags) + for k in ['queries', 'nscanned', 'nscannedObjects', 'inserts', 'deletes']: + mname = 'stats.idx.%s' % k + mname = self.normalize(mname, 'tokumx') + self.rate(mname, idx_stats[k], tags=idx_tags) + else: + m = 'stats.coll.%s' % m + m = self.normalize(m, 'tokumx') + self.gauge(m, v, coll_tags) + + # If these keys exist, remove them for now as they cannot be serialized + try: + status['backgroundFlushing'].pop('last_finished') + except KeyError: + pass + try: + status.pop('localTime') + except KeyError: + pass + + # Go through the metrics and save the values + for m in self.METRICS: + # each metric is of the form: x.y.z with z optional + # and can be found at status[x][y][z] + value = status + try: + for c in m.split("."): + value = value[c] + except KeyError: + continue + + # value is now status[x][y][z] + assert type(value) in (types.IntType, types.LongType, types.FloatType) + + # Check if metric is a gauge or rate + if m in self.GAUGES: + m = self.normalize(m, 'tokumx') + self.gauge(m, value, tags=tags) + + if m in self.RATES: + m = self.normalize(m, 'tokumx') + "ps" + self.rate(m, value, tags=tags) diff --git a/conf.d/tokumx.yaml.example b/conf.d/tokumx.yaml.example new file mode 100644 index 0000000000..3c6589b488 --- /dev/null +++ b/conf.d/tokumx.yaml.example @@ -0,0 +1,16 @@ +init_config: + +instances: +# - server: mongodb://localhost:27017 +# tags: +# - optional_tag1 +# - optional_tag2 +# +# Optional SSL parameters, see https://github.com/mongodb/mongo-python-driver/blob/2.6.3/pymongo/mongo_client.py#L193-L212 +# for more details +# +# ssl: False # Optional (default to False) +# ssl_keyfile: # Path to the private keyfile used to identify the local +# ssl_certfile: # Path to the certificate file used to identify the local connection against mongod. +# ssl_cert_reqs: # Specifies whether a certificate is required from the other side of the connection, and whether it will be validated if provided. +# ssl_ca_certs: # Path to the ca_certs file diff --git a/tests/test_tokumx.py b/tests/test_tokumx.py new file mode 100644 index 0000000000..3671ed1814 --- /dev/null +++ b/tests/test_tokumx.py @@ -0,0 +1,133 @@ +import unittest +import logging +import subprocess +from tempfile import mkdtemp +import time +import socket + +import pymongo + +from tests.common import load_check + +PORT1 = 37017 +PORT2 = 37018 +MAX_WAIT = 150 + +class TestMongo(unittest.TestCase): + def wait4mongo(self, process, port): + # Somehow process.communicate() hangs + out = process.stdout + loop = 0 + while True: + l = out.readline() + if l.find("[initandlisten] waiting for connections on port") > -1: + break + else: + time.sleep(0.1) + loop += 1 + if loop >= MAX_WAIT: + break + + def setUp(self): + self.agentConfig = { + 'version': '0.1', + 'api_key': 'toto' + } + + # Initialize the check from checks.d + self.check = load_check('tokumx', {'init_config': {}, 'instances': {}}, self.agentConfig) + + # Start 2 instances of TokuMX in a replica set + dir1 = mkdtemp() + dir2 = mkdtemp() + try: + self.p1 = subprocess.Popen(["mongod", "--dbpath", dir1, "--port", str(PORT1), "--replSet", "testset/%s:%d" % (socket.gethostname(), PORT2), "--rest"], + executable="mongod", + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + # Sleep until mongo comes online + self.wait4mongo(self.p1, PORT1) + if self.p1: + # Set up replication + c1 = pymongo.Connection('localhost:%s' % PORT1, slave_okay=True) + self.p2 = subprocess.Popen(["mongod", "--dbpath", dir2, "--port", str(PORT2), "--replSet", "testset/%s:%d" % (socket.gethostname(), PORT1), "--rest"], + executable="mongod", + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + self.wait4mongo(self.p2, PORT2) + # Waiting before all members are online + time.sleep(15) + c1.admin.command("replSetInitiate") + # Sleep for 15s until replication is stable + time.sleep(30) + x = c1.admin.command("replSetGetStatus") + assert pymongo.Connection('localhost:%s' % PORT2) + except Exception: + logging.getLogger().exception("Cannot instantiate mongod properly") + + def tearDown(self): + try: + if "p1" in dir(self): self.p1.terminate() + if "p2" in dir(self): self.p2.terminate() + except Exception: + logging.getLogger().exception("Cannot terminate mongod instances") + + def testMongoCheck(self): + self.config = { + 'instances': [{ + 'server': "mongodb://localhost:%s/test" % PORT1 + }, + { + 'server': "mongodb://localhost:%s/test" % PORT2 + }] + } + + # Test mongodb with checks.d + self.check = load_check('tokumx', self.config, self.agentConfig) + + # Run the check against our running server + self.check.check(self.config['instances'][0]) + # Sleep for 1 second so the rate interval >=1 + time.sleep(1) + # Run the check again so we get the rates + self.check.check(self.config['instances'][0]) + + # Metric assertions + metrics = self.check.get_metrics() + assert metrics + self.assertTrue(type(metrics) == type([])) + self.assertTrue(len(metrics) > 0) + + metric_val_checks = { + 'mongodb.connections.current': lambda x: x >= 1, + 'mongodb.connections.available': lambda x: x >= 1, + 'mongodb.uptime': lambda x: x >= 0, + 'mongodb.ft.cachetable.size.current': lambda x: x > 0, + 'mongodb.ft.cachetable.size.limit': lambda x: x > 0, + } + + for m in metrics: + metric_name = m[0] + if metric_name in metric_val_checks: + self.assertTrue( metric_val_checks[metric_name]( m[2] ) ) + + # Run the check against our running server + self.check.check(self.config['instances'][1]) + # Sleep for 1 second so the rate interval >=1 + time.sleep(1) + # Run the check again so we get the rates + self.check.check(self.config['instances'][1]) + + # Metric assertions + metrics = self.check.get_metrics() + assert metrics + self.assertTrue(type(metrics) == type([])) + self.assertTrue(len(metrics) > 0) + + for m in metrics: + metric_name = m[0] + if metric_name in metric_val_checks: + self.assertTrue( metric_val_checks[metric_name]( m[2] ) ) + +if __name__ == '__main__': + unittest.main()