diff --git a/agent.py b/agent.py index 8029c20219..57405f946f 100755 --- a/agent.py +++ b/agent.py @@ -24,6 +24,7 @@ # Custom modules from checks.collector import Collector from checks.check_status import CollectorStatus +from utils.profile import pretty_statistics from config import ( get_confd_path, get_config, @@ -42,6 +43,7 @@ ) from utils.pidfile import PidFile from utils.flare import configcheck, Flare +from utils.profile import AgentProfiler # Constants PID_NAME = "dd-agent" @@ -50,6 +52,8 @@ START_COMMANDS = ['start', 'restart', 'foreground'] DD_AGENT_COMMANDS = ['check', 'flare', 'jmx'] +DEFAULT_COLLECTOR_PROFILE_INTERVAL = 20 + # Globals log = logging.getLogger('collector') @@ -59,11 +63,12 @@ class Agent(Daemon): The agent class is a daemon that runs the collector in a background process. """ - def __init__(self, pidfile, autorestart, start_event=True): + def __init__(self, pidfile, autorestart, start_event=True, in_developer_mode=False): Daemon.__init__(self, pidfile, autorestart=autorestart) self.run_forever = True self.collector = None self.start_event = start_event + self.in_developer_mode = in_developer_mode def _handle_sigterm(self, signum, frame): log.debug("Caught sigterm. Stopping run loop.") @@ -110,6 +115,10 @@ def run(self, config=None): self.collector = Collector(agentConfig, emitters, systemStats, hostname) + # In developer mode, the number of runs to be included in a single collector profile + collector_profile_interval = agentConfig.get('collector_profile_interval', + DEFAULT_COLLECTOR_PROFILE_INTERVAL) + # Configure the watchdog. check_frequency = int(agentConfig['check_freq']) watchdog = self._get_watchdog(check_frequency, agentConfig) @@ -118,36 +127,30 @@ def run(self, config=None): self.restart_interval = int(agentConfig.get('restart_interval', RESTART_INTERVAL)) self.agent_start = time.time() + profiled = False + collector_profiled_runs = 0 + # Run the main loop. while self.run_forever: - - # enable profiler if needed - profiled = False - if agentConfig.get('profile', False) and agentConfig.get('profile').lower() == 'yes': + # Setup profiling if necessary + if self.in_developer_mode and not profiled: try: - import cProfile - profiler = cProfile.Profile() + profiler = AgentProfiler() + profiler.enable_profiling() profiled = True - profiler.enable() - log.debug("Agent profiling is enabled") - except Exception: - log.warn("Cannot enable profiler") + except Exception as e: + log.warn("Cannot enable profiler: %s" % str(e)) # Do the work. self.collector.run(checksd=checksd, start_event=self.start_event) - - # disable profiler and printout stats to stdout - if agentConfig.get('profile', False) and agentConfig.get('profile').lower() == 'yes' and profiled: - try: - profiler.disable() - import pstats - from cStringIO import StringIO - s = StringIO() - ps = pstats.Stats(profiler, stream=s).sort_stats("cumulative") - ps.print_stats() - log.debug(s.getvalue()) - except Exception: - log.warn("Cannot disable profiler") + if profiled: + if collector_profiled_runs >= collector_profile_interval: + try: + profiler.disable_profiling() + profiled = False + collector_profiled_runs = 0 + except Exception as e: + log.warn("Cannot disable profiler: %s" % str(e)) # Check if we should restart. if self.autorestart and self._should_restart(): @@ -158,8 +161,9 @@ def run(self, config=None): if self.run_forever: if watchdog: watchdog.reset() + if profiled: + collector_profiled_runs += 1 time.sleep(check_frequency) - # Now clean-up. try: CollectorStatus.remove_latest_status() @@ -212,7 +216,7 @@ def main(): agentConfig = get_config(options=options) autorestart = agentConfig.get('autorestart', False) hostname = get_hostname(agentConfig) - + in_developer_mode = agentConfig.get('developer_mode') COMMANDS_AGENT = [ 'start', 'stop', @@ -247,7 +251,7 @@ def main(): deprecate_old_command_line_tools() if command in COMMANDS_AGENT: - agent = Agent(PidFile('dd-agent').get_path(), autorestart) + agent = Agent(PidFile('dd-agent').get_path(), autorestart, in_developer_mode=in_developer_mode) if command in START_COMMANDS: log.info('Agent version %s' % get_version()) @@ -301,17 +305,18 @@ def parent_func(): agent.start_event = False checks = load_check_directory(agentConfig, hostname) for check in checks['initialized_checks']: if check.name == check_name: - check.run() - print check.get_metrics() - print check.get_events() - print check.get_service_checks() + if in_developer_mode: + check.run = AgentProfiler.wrap_profiling(check.run) + + cs = Collector.run_single_check(check, verbose=True) + print CollectorStatus.render_check_status(cs) + if len(args) == 3 and args[2] == 'check_rate': print "Running 2nd iteration to capture rate metrics" time.sleep(1) - check.run() - print check.get_metrics() - print check.get_events() - print check.get_service_checks() + cs = Collector.run_single_check(check, verbose=True) + print CollectorStatus.render_check_status(cs) + check.stop() elif 'configcheck' == command or 'configtest' == command: diff --git a/checks.d/agent_metrics.py b/checks.d/agent_metrics.py new file mode 100644 index 0000000000..c6523f375c --- /dev/null +++ b/checks.d/agent_metrics.py @@ -0,0 +1,151 @@ +import os +import threading + +# 3p +try: + import psutil +except ImportError: + psutil = None + +# project +from checks import AgentCheck +from checks.metric_types import MetricTypes +from config import _is_affirmative + +MAX_THREADS_COUNT = 50 +MAX_COLLECTION_TIME = 30 +MAX_EMIT_TIME = 5 +MAX_CPU_PCT = 10 + + +class UnsupportedMetricType(Exception): + """ + Raised by :class:`AgentMetrics` when a metric type outside outside of AgentMetrics.ALLOWED_METRIC_TYPES + is requested for measurement of a particular statistic + """ + def __init__(self, metric_name, metric_type): + message = 'Unsupported Metric Type for {0} : {1}'.format(metric_name, metric_type) + Exception.__init__(self, message) + +class AgentMetrics(AgentCheck): + """ + New-style version of `CollectorMetrics` + Gets information about agent performance on every collector loop + """ + + def __init__(self, *args, **kwargs): + AgentCheck.__init__(self, *args, **kwargs) + self._collector_payload = {} + self._metric_context = {} + + def _psutil_config_to_stats(self, instance): + """ + Reads `init_config` for `psutil` methods to call on the current process + Calls those methods and stores the raw output + + :returns a dictionary of statistic_name: value + """ + process_metrics = instance.get('process_metrics', self.init_config.get('process_metrics', None)) + if not process_metrics: + self.log.error('No metrics configured for AgentMetrics check!') + return {} + + methods, metric_types = zip( + *[(p['name'], p.get('type', MetricTypes.GAUGE)) + for p in process_metrics if _is_affirmative(p.get('active'))] + ) + + names_to_metric_types = {} + for i, m in enumerate(methods): + names_to_metric_types[AgentCheck._get_statistic_name_from_method(m)] = metric_types[i] + + stats = AgentCheck._collect_internal_stats(methods) + return stats, names_to_metric_types + + def _send_single_metric(self, metric_name, metric_value, metric_type): + if metric_type == MetricTypes.GAUGE: + self.gauge(metric_name, metric_value) + elif metric_type == MetricTypes.RATE: + self.rate(metric_name, metric_value) + else: + raise UnsupportedMetricType(metric_name, metric_type) + + def _register_psutil_metrics(self, stats, names_to_metric_types): + """ + Saves sample metrics from psutil + + :param stats: a dictionary that looks like: + { + 'memory_info': OrderedDict([('rss', 24395776), ('vms', 144666624)]), + 'io_counters': OrderedDict([('read_count', 4536), + ('write_count', 100), + ('read_bytes', 0), + ('write_bytes', 61440)]) + ... + } + + This creates a metric like `datadog.agent.collector.{key_1}.{key_2}` where key_1 is a top-level + key in `stats`, and key_2 is a nested key. + E.g. datadog.agent.collector.memory_info.rss + """ + + base_metric = 'datadog.agent.collector.{0}.{1}' + #TODO: May have to call self.normalize(metric_name) to get a compliant name + for k, v in stats.iteritems(): + metric_type = names_to_metric_types[k] + if isinstance(v, dict): + for _k, _v in v.iteritems(): + full_metric_name = base_metric.format(k, _k) + self._send_single_metric(full_metric_name, _v, metric_type) + else: + full_metric_name = 'datadog.agent.collector.{0}'.format(k) + self._send_single_metric(full_metric_name, v, metric_type) + + def set_metric_context(self, payload, context): + self._collector_payload = payload + self._metric_context = context + + def get_metric_context(self): + return self._collector_payload, self._metric_context + + def check(self, instance): + if self.in_developer_mode: + stats, names_to_metric_types = self._psutil_config_to_stats(instance) + self._register_psutil_metrics(stats, names_to_metric_types) + + payload, context = self.get_metric_context() + collection_time = context.get('collection_time', None) + emit_time = context.get('emit_time', None) + cpu_time = context.get('cpu_time', None) + + if threading.activeCount() > MAX_THREADS_COUNT: + self.gauge('datadog.agent.collector.threads.count', threading.activeCount()) + self.log.info("Thread count is high: %d" % threading.activeCount()) + + collect_time_exceeds_threshold = collection_time > MAX_COLLECTION_TIME + if collection_time is not None and \ + (collect_time_exceeds_threshold or self.in_developer_mode): + + self.gauge('datadog.agent.collector.collection.time', collection_time) + if collect_time_exceeds_threshold: + self.log.info("Collection time (s) is high: %.1f, metrics count: %d, events count: %d" + % (collection_time, len(payload['metrics']), len(payload['events']))) + + emit_time_exceeds_threshold = emit_time > MAX_EMIT_TIME + if emit_time is not None and \ + (emit_time_exceeds_threshold or self.in_developer_mode): + self.gauge('datadog.agent.emitter.emit.time', emit_time) + if emit_time_exceeds_threshold: + self.log.info("Emit time (s) is high: %.1f, metrics count: %d, events count: %d" + % (emit_time, len(payload['metrics']), len(payload['events']))) + + if cpu_time is not None: + try: + cpu_used_pct = 100.0 * float(cpu_time)/float(collection_time) + if cpu_used_pct > MAX_CPU_PCT: + self.gauge('datadog.agent.collector.cpu.used', cpu_used_pct) + self.log.info("CPU consumed (%%) is high: %.1f, metrics count: %d, events count: %d" + % (cpu_used_pct, len(payload['metrics']), len(payload['events']))) + except Exception, e: + self.log.debug("Couldn't compute cpu used by collector with values %s %s %s" + % (cpu_time, collection_time, str(e))) diff --git a/checks/__init__.py b/checks/__init__.py index 89369ad1eb..77457cba0a 100644 --- a/checks/__init__.py +++ b/checks/__init__.py @@ -5,6 +5,7 @@ """ import logging +import numbers import re import socket import time @@ -13,18 +14,30 @@ import sys import traceback import copy +import timeit from pprint import pprint from collections import defaultdict from util import LaconicFilter, get_os, get_hostname, get_next_id, yLoader from config import get_confd_path from checks import check_status +from utils.profile import pretty_statistics # 3rd party import yaml +try: + import psutil +except ImportError: + psutil = None + log = logging.getLogger(__name__) +# Default methods run when collecting info about the agent in developer mode +DEFAULT_PSUTIL_METHODS = ['get_memory_info', 'get_io_counters'] + +AGENT_METRICS_CHECK_NAME = 'agent_metrics' + # Konstants class CheckException(Exception): pass class Infinity(CheckException): pass @@ -265,6 +278,7 @@ def get_metrics(self, expire=True): pass return metrics + class AgentCheck(object): OK, WARNING, CRITICAL, UNKNOWN = (0, 1, 2, 3) @@ -286,6 +300,9 @@ def __init__(self, name, init_config, agentConfig, instances=None): self.name = name self.init_config = init_config or {} self.agentConfig = agentConfig + self.in_developer_mode = agentConfig.get('developer_mode') and psutil is not None + self._internal_profiling_stats = None + self.hostname = agentConfig.get('checksd_hostname') or get_hostname(agentConfig) self.log = logging.getLogger('%s.%s' % (__name__, name)) @@ -535,8 +552,59 @@ def get_warnings(self): self.warnings = [] return warnings + @staticmethod + def _get_statistic_name_from_method(method_name): + return method_name[4:] if method_name.startswith('get_') else method_name + + @staticmethod + def _collect_internal_stats(methods=None): + current_process = psutil.Process(os.getpid()) + + methods = methods or DEFAULT_PSUTIL_METHODS + filtered_methods = [m for m in methods if hasattr(current_process, m)] + + stats = {} + + for method in filtered_methods: + # Go from `get_memory_info` -> `memory_info` + stat_name = AgentCheck._get_statistic_name_from_method(method) + try: + raw_stats = getattr(current_process, method)() + try: + stats[stat_name] = raw_stats._asdict() + except AttributeError: + if isinstance(raw_stats, numbers.Number): + stats[stat_name] = raw_stats + else: + log.warn("Could not serialize output of {0} to dict".format(method)) + + except psutil.AccessDenied: + log.warn("Cannot call psutil method {} : Access Denied".format(method)) + + return stats + + def _set_internal_profiling_stats(self, before, after): + self._internal_profiling_stats = {'before': before, 'after': after} + + def _get_internal_profiling_stats(self): + """ + If in developer mode, return a dictionary of statistics about the check run + """ + stats = self._internal_profiling_stats + self._internal_profiling_stats = None + return stats + def run(self): """ Run all instances. """ + + # Store run statistics if needed + before, after = None, None + if self.in_developer_mode and self.name != AGENT_METRICS_CHECK_NAME: + try: + before = AgentCheck._collect_internal_stats() + except Exception: # It's fine if we can't collect stats for the run, just log and proceed + self.log.debug("Failed to collect Agent Stats before check {0}".format(self.name)) + instance_statuses = [] for i, instance in enumerate(self.instances): try: @@ -548,14 +616,28 @@ def run(self): continue self.last_collection_time[i] = now + + check_start_time = None + if self.in_developer_mode: + check_start_time = timeit.default_timer() self.check(copy.deepcopy(instance)) + + instance_check_stats = None + if check_start_time is not None: + instance_check_stats = {'run_time': timeit.default_timer() - check_start_time} + if self.has_warnings(): instance_status = check_status.InstanceStatus(i, check_status.STATUS_WARNING, - warnings=self.get_warnings() + warnings=self.get_warnings(), + instance_check_stats=instance_check_stats ) else: - instance_status = check_status.InstanceStatus(i, check_status.STATUS_OK) + instance_status = check_status.InstanceStatus( + i, + check_status.STATUS_OK, + instance_check_stats=instance_check_stats + ) except Exception, e: self.log.exception("Check '%s' instance #%s failed" % (self.name, i)) instance_status = check_status.InstanceStatus(i, @@ -564,6 +646,15 @@ def run(self): tb=traceback.format_exc() ) instance_statuses.append(instance_status) + + if self.in_developer_mode and self.name != AGENT_METRICS_CHECK_NAME: + try: + after = AgentCheck._collect_internal_stats() + self._set_internal_profiling_stats(before, after) + log.info("\n \t %s %s" % (self.name, pretty_statistics(self._internal_profiling_stats))) + except Exception: # It's fine if we can't collect stats for the run, just log and proceed + self.log.debug("Failed to collect Agent Stats after check {0}".format(self.name)) + return instance_statuses def check(self, instance): diff --git a/checks/check_status.py b/checks/check_status.py index b589f339e1..db2b8883ca 100644 --- a/checks/check_status.py +++ b/checks/check_status.py @@ -22,10 +22,13 @@ # project import config from config import get_config, get_jmx_status_path, _windows_commondata_path + from util import get_os, plural from utils.ntp import get_ntp_datadog_host from utils.platform import Platform from utils.pidfile import PidFile +from utils.profile import pretty_statistics + STATUS_OK = 'OK' STATUS_ERROR = 'ERROR' @@ -262,7 +265,8 @@ def _get_pickle_path(cls): class InstanceStatus(object): - def __init__(self, instance_id, status, error=None, tb=None, warnings=None, metric_count=None): + def __init__(self, instance_id, status, error=None, tb=None, warnings=None, metric_count=None, + instance_check_stats=None): self.instance_id = instance_id self.status = status if error is not None: @@ -272,6 +276,7 @@ def __init__(self, instance_id, status, error=None, tb=None, warnings=None, metr self.traceback = tb self.warnings = warnings self.metric_count = metric_count + self.instance_check_stats = instance_check_stats def has_error(self): return self.status == STATUS_ERROR @@ -285,7 +290,8 @@ class CheckStatus(object): def __init__(self, check_name, instance_statuses, metric_count=None, event_count=None, service_check_count=None, init_failed_error=None, init_failed_traceback=None, - library_versions=None, source_type_name=None): + library_versions=None, source_type_name=None, + check_stats=None): self.name = check_name self.source_type_name = source_type_name self.instance_statuses = instance_statuses @@ -295,6 +301,7 @@ def __init__(self, check_name, instance_statuses, metric_count=None, self.init_failed_error = init_failed_error self.init_failed_traceback = init_failed_traceback self.library_versions = library_versions + self.check_stats = check_stats @property def status(self): @@ -348,6 +355,78 @@ def status(self): def has_error(self): return self.status != STATUS_OK + @staticmethod + def check_status_lines(cs): + check_lines = [ + ' ' + cs.name, + ' ' + '-' * len(cs.name) + ] + if cs.init_failed_error: + check_lines.append(" - initialize check class [%s]: %s" % + (style(STATUS_ERROR, 'red'), + repr(cs.init_failed_error))) + if cs.init_failed_traceback: + check_lines.extend(' ' + line for line in + cs.init_failed_traceback.split('\n')) + else: + for s in cs.instance_statuses: + c = 'green' + if s.has_warnings(): + c = 'yellow' + if s.has_error(): + c = 'red' + line = " - instance #%s [%s]" % ( + s.instance_id, style(s.status, c)) + if s.has_error(): + line += u": %s" % s.error + if s.metric_count is not None: + line += " collected %s metrics" % s.metric_count + if s.instance_check_stats is not None: + line += " Last run duration: %s" % s.instance_check_stats.get('run_time') + + check_lines.append(line) + + if s.has_warnings(): + for warning in s.warnings: + warn = warning.split('\n') + if not len(warn): continue + check_lines.append(u" %s: %s" % + (style("Warning", 'yellow'), warn[0])) + check_lines.extend(u" %s" % l for l in + warn[1:]) + if s.traceback is not None: + check_lines.extend(' ' + line for line in + s.traceback.split('\n')) + + check_lines += [ + " - Collected %s metric%s, %s event%s & %s service check%s" % ( + cs.metric_count, plural(cs.metric_count), + cs.event_count, plural(cs.event_count), + cs.service_check_count, plural(cs.service_check_count)), + ] + + if cs.check_stats is not None: + check_lines += [ + " - Stats: %s" % pretty_statistics(cs.check_stats) + ] + + if cs.library_versions is not None: + check_lines += [ + " - Dependencies:"] + for library, version in cs.library_versions.iteritems(): + check_lines += [" - %s: %s" % (library, version)] + + check_lines += [""] + return check_lines + + @staticmethod + def render_check_status(cs): + indent = " " + lines = [ + indent + l for l in CollectorStatus.check_status_lines(cs) + ] + ["", ""] + return "\n".join(lines) + def body_lines(self): # Metadata whitelist metadata_whitelist = [ @@ -447,6 +526,8 @@ def body_lines(self): line += u": %s" % s.error if s.metric_count is not None: line += " collected %s metrics" % s.metric_count + if s.instance_check_stats is not None: + line += " Last run duration: %s" % s.instance_check_stats.get('run_time') check_lines.append(line) @@ -469,6 +550,11 @@ def body_lines(self): cs.service_check_count, plural(cs.service_check_count)), ] + if cs.check_stats is not None: + check_lines += [ + " - Stats: %s" % pretty_statistics(cs.check_stats) + ] + if cs.library_versions is not None: check_lines += [ " - Dependencies:"] diff --git a/checks/collector.py b/checks/collector.py index a3cf0b0b88..ef401ab919 100644 --- a/checks/collector.py +++ b/checks/collector.py @@ -7,6 +7,7 @@ import logging import datetime import subprocess +import pprint import modules @@ -16,8 +17,7 @@ import jmxfetch import checks.system.unix as u import checks.system.win32 as w32 -from checks import create_service_check, AgentCheck -from checks.agent_metrics import CollectorMetrics +from checks import create_service_check, AgentCheck, AGENT_METRICS_CHECK_NAME from checks.ganglia import Ganglia from checks.datadog import Dogstreams, DdForwarder from checks.check_status import CheckStatus, CollectorStatus, EmitterStatus, STATUS_OK, STATUS_ERROR @@ -99,8 +99,8 @@ def __init__(self, agentConfig, emitters, systemStats, hostname): self._dogstream = Dogstreams.init(log, self.agentConfig) self._ddforwarder = DdForwarder(log, self.agentConfig) - # Agent Metrics - self._agent_metrics = CollectorMetrics(log) + # Agent performance metrics check + self._agent_metrics = None self._metrics_checks = [] @@ -133,6 +133,10 @@ def stop(self): for check in self.initialized_checks_d: check.stop() + @staticmethod + def _stats_for_display(raw_stats): + return pprint.pformat(raw_stats, indent=4) + def run(self, checksd=None, start_event=True): """ Collect data from each check and submit their data. @@ -147,6 +151,15 @@ def run(self, checksd=None, start_event=True): self.initialized_checks_d = checksd['initialized_checks'] # is a list of AgentCheck instances self.init_failed_checks_d = checksd['init_failed_checks'] # is of type {check_name: {error, traceback}} + # Find the AgentMetrics check and pop it out + # This check must run at the end of the loop to collect info on agent performance + if not self._agent_metrics: + for check in self.initialized_checks_d: + if check.name == AGENT_METRICS_CHECK_NAME: + self._agent_metrics = check + self.initialized_checks_d.remove(check) + break + payload = self._build_payload(start_event=start_event) metrics = payload['metrics'] events = payload['events'] @@ -271,6 +284,8 @@ def run(self, checksd=None, start_event=True): event_count = 0 service_check_count = 0 check_start_time = time.time() + check_stats = None + try: # Run the check. instance_statuses = check.run() @@ -278,6 +293,7 @@ def run(self, checksd=None, start_event=True): # Collect the metrics and events. current_check_metrics = check.get_metrics() current_check_events = check.get_events() + check_stats = check._get_internal_profiling_stats() # Save them for the payload. metrics.extend(current_check_metrics) @@ -290,12 +306,17 @@ def run(self, checksd=None, start_event=True): # Save the status of the check. metric_count = len(current_check_metrics) event_count = len(current_check_events) + except Exception: log.exception("Error running check %s" % check.name) - check_status = CheckStatus(check.name, instance_statuses, metric_count, event_count, service_check_count, + check_status = CheckStatus( + check.name, instance_statuses, metric_count, + event_count, service_check_count, library_versions=check.get_library_info(), - source_type_name=check.SOURCE_TYPE_NAME or check.name) + source_type_name=check.SOURCE_TYPE_NAME or check.name, + check_stats=check_stats + ) # Service check for Agent checks failures service_check_tags = ["check:%s" % check.name] @@ -370,11 +391,30 @@ def run(self, checksd=None, start_event=True): collect_duration = timer.step() if self.os != 'windows': - payload['metrics'].extend(self._agent_metrics.check(payload, self.agentConfig, - collect_duration, self.emit_duration, time.clock() - cpu_clock)) + if self._agent_metrics is not None: + self._agent_metrics.set_metric_context(payload, { + 'collection_time': collect_duration, + 'emit_time': self.emit_duration, + 'cpu_time': time.clock() - cpu_clock + }) + self._agent_metrics.run() + agent_stats = self._agent_metrics.get_metrics() + payload['metrics'].extend(agent_stats) + # Dump the metrics to log when in developer mode + if self.agentConfig.get('developer_mode', False): + log.info("\n AGENT STATS: \n {0}".format(Collector._stats_for_display(agent_stats))) else: - payload['metrics'].extend(self._agent_metrics.check(payload, self.agentConfig, - collect_duration, self.emit_duration)) + if self._agent_metrics is not None: + self._agent_metrics.set_metric_context(payload, { + 'collection_time': collect_duration, + 'emit_time': self.emit_duration, + }) + self._agent_metrics.run() + agent_stats = self._agent_metrics.get_metrics() + payload['metrics'].extend(agent_stats) + # Dump the metrics to log when in developer mode + if self.agentConfig.get('developer_mode', False): + log.info("\n AGENT STATS: \n {0}".format(Collector._stats_for_display(agent_stats))) # Let's send our payload emitter_statuses = self._emit(payload) @@ -398,6 +438,49 @@ def run(self, checksd=None, start_event=True): return payload + @staticmethod + def run_single_check(check, verbose=True): + log.info("Running check %s" % check.name) + instance_statuses = [] + metric_count = 0 + event_count = 0 + service_check_count = 0 + check_start_time = time.time() + check_stats = None + + try: + # Run the check. + instance_statuses = check.run() + + # Collect the metrics and events. + current_check_metrics = check.get_metrics() + current_check_events = check.get_events() + current_service_checks = check.get_service_checks() + + check_stats = check._get_internal_profiling_stats() + + # Save the status of the check. + metric_count = len(current_check_metrics) + event_count = len(current_check_events) + service_check_count = len(current_service_checks) + + print "Metrics: \n{0}".format(pprint.pformat(current_check_metrics)) + print "Events: \n{0}".format(pprint.pformat(current_check_events)) + print "Service Checks: \n{0}".format(pprint.pformat(current_service_checks)) + + except Exception: + log.exception("Error running check %s" % check.name) + + check_status = CheckStatus( + check.name, instance_statuses, metric_count, + event_count, service_check_count, + library_versions=check.get_library_info(), + source_type_name=check.SOURCE_TYPE_NAME or check.name, + check_stats=check_stats + ) + + return check_status + def _emit(self, payload): """ Send the payload via the emitters. """ statuses = [] diff --git a/conf.d/agent_metrics.yaml.default b/conf.d/agent_metrics.yaml.default new file mode 100755 index 0000000000..3f068c897e --- /dev/null +++ b/conf.d/agent_metrics.yaml.default @@ -0,0 +1,17 @@ +init_config: + process_metrics: + - name: get_memory_info + type: gauge + active: yes + - name: get_io_counters + type: rate + active: yes + - name: get_num_threads + type: gauge + active: yes + - name: get_connections + type: gauge + active: no + +instances: + [{}] diff --git a/config.py b/config.py index afef58bf57..25d5e18a4b 100644 --- a/config.py +++ b/config.py @@ -57,12 +57,12 @@ 'nagios_perf_cfg' ] -DEFAULT_CHECKS = ("network", "ntp") LEGACY_DATADOG_URLS = [ "app.datadoghq.com", "app.datad0g.com", ] + class PathNotFound(Exception): pass @@ -80,6 +80,8 @@ def get_parsed_args(): parser.add_option('-v', '--verbose', action='store_true', default=False, dest='verbose', help='Print out stacktraces for errors in checks') + parser.add_option('-p', '--profile', action='store_true', default=False, + dest='profile', help='Enable Developer Mode') try: options, args = parser.parse_args() @@ -88,7 +90,8 @@ def get_parsed_args(): options, args = Values({'autorestart': False, 'dd_url': None, 'disable_dd':False, - 'use_forwarder': False}), [] + 'use_forwarder': False, + 'profile': False}), [] return options, args @@ -326,6 +329,15 @@ def get_config(parse_args=True, cfg_path=None, options=None): for option in config.options('Main'): agentConfig[option] = config.get('Main', option) + # Store developer mode setting in the agentConfig + in_developer_mode = None + if config.has_option('Main', 'developer_mode'): + agentConfig['developer_mode'] = _is_affirmative(config.get('Main', 'developer_mode')) + + # Allow an override with the --profile option + if options is not None and options.profile: + agentConfig['developer_mode'] = True + # # Core config # @@ -757,7 +769,7 @@ def load_check_directory(agentConfig, hostname): ''' Return the initialized checks from checks.d, and a mapping of checks that failed to initialize. Only checks that have a configuration file in conf.d will be returned. ''' - from checks import AgentCheck + from checks import AgentCheck, AGENT_METRICS_CHECK_NAME initialized_checks = {} init_failed_checks = {} @@ -799,22 +811,17 @@ def load_check_directory(agentConfig, hostname): # Let's see if there is a conf.d for this check conf_path = os.path.join(confd_path, '%s.yaml' % check_name) - - # Default checks are checks that are enabled by default - # They read their config from the "[CHECKNAME].yaml.default" file - if check_name in DEFAULT_CHECKS: - default_conf_path = os.path.join(confd_path, '%s.yaml.default' % check_name) - else: - default_conf_path = None - conf_exists = False if os.path.exists(conf_path): conf_exists = True + else: + log.debug("No configuration file for %s. Looking for defaults" % check_name) - elif not conf_exists and default_conf_path is not None: + # Default checks read their config from the "[CHECKNAME].yaml.default" file + default_conf_path = os.path.join(confd_path, '%s.yaml.default' % check_name) if not os.path.exists(default_conf_path): - log.error("Default configuration file {0} is missing".format(default_conf_path)) + log.debug("Default configuration file {0} is missing. Skipping check".format(default_conf_path)) continue conf_path = default_conf_path conf_exists = True @@ -912,7 +919,7 @@ def load_check_directory(agentConfig, hostname): log.debug('Loaded check.d/%s.py' % check_name) init_failed_checks.update(deprecated_checks) - log.info('initialized checks.d checks: %s' % initialized_checks.keys()) + log.info('initialized checks.d checks: %s' % [k for k in initialized_checks.keys() if k != AGENT_METRICS_CHECK_NAME]) log.info('initialization failed checks.d checks: %s' % init_failed_checks.keys()) return {'initialized_checks':initialized_checks.values(), 'init_failed_checks':init_failed_checks, diff --git a/datadog.conf.example b/datadog.conf.example index 19691628d8..f7759ff79e 100644 --- a/datadog.conf.example +++ b/datadog.conf.example @@ -10,7 +10,7 @@ dd_url: https://app.datadoghq.com # proxy_password: password # To be used with some proxys that return a 302 which make curl switch from POST to GET # See http://stackoverflow.com/questions/8156073/curl-violate-rfc-2616-10-3-2-and-switch-from-post-to-get -# proxy_forbid_method_switch: no +# proxy_forbid_method_switch: no # If you run the agent behind haproxy, you might want to set this to yes # skip_ssl_validation: no @@ -29,6 +29,12 @@ api_key: # Collect AWS EC2 custom tags as agent tags # collect_ec2_tags: no +# Enable Agent Developer Mode +# Agent Developer Mode collects and sends more fine-grained metrics about agent and check performance +# developer_mode: no +# In developer mode, the number of runs to be included in a single collector profile +# collector_profile_interval: 20 + # Collect instance metadata # The Agent will try to collect instance metadata for EC2 and GCE instances by # trying to connect to the local endpoint: http://169.254.169.254 diff --git a/tests/checks/mock/test_agent_metrics.py b/tests/checks/mock/test_agent_metrics.py new file mode 100644 index 0000000000..d4c369dbb5 --- /dev/null +++ b/tests/checks/mock/test_agent_metrics.py @@ -0,0 +1,126 @@ +import mock +from tests.checks.common import AgentCheckTest, load_check +from checks import AGENT_METRICS_CHECK_NAME + +MOCK_CONFIG = { + 'instances': [ + {'process_metrics': [ + { + 'name': 'get_memory_info', + 'type': 'gauge', + 'active': 'yes' + }, + { + 'name': 'get_cpu_times', + 'type': 'rate', + 'active': 'yes' + }, + ]}], + 'init_config': {} +} + +MOCK_CONFIG_2 = { + 'instances': [ + {'process_metrics': [ + { + 'name': 'get_memory_info', + 'type': 'gauge', + 'active': 'yes' + }, + { + 'name': 'get_non_existent_stat', + 'type': 'gauge', + 'active': 'yes' + }, + ]}], + 'init_config': {} +} + +AGENT_CONFIG_DEV_MODE = { + 'developer_mode': True +} + +AGENT_CONFIG_DEFAULT_MODE = {} + +MOCK_STATS = { + 'memory_info': dict([('rss', 16814080), ('vms', 74522624)]), + 'cpu_times': dict([('user', 0.041733968), ('system', 0.022306718)]) +} + +MOCK_NAMES_TO_METRIC_TYPES = { + 'memory_info': 'gauge', + 'cpu_times': 'gauge' +} + + +class AgentMetricsTestCase(AgentCheckTest): + + CHECK_NAME = AGENT_METRICS_CHECK_NAME + + def mock_psutil_config_to_stats(self): + return MOCK_STATS, MOCK_NAMES_TO_METRIC_TYPES + + ### Tests for Agent Developer Mode + def test_psutil_config_to_stats(self): + check = load_check(self.CHECK_NAME, MOCK_CONFIG, AGENT_CONFIG_DEV_MODE) + instance = MOCK_CONFIG.get('instances')[0] + + stats, names_to_metric_types = check._psutil_config_to_stats(instance) + self.assertIn('memory_info', names_to_metric_types) + self.assertEqual(names_to_metric_types['memory_info'], 'gauge') + + self.assertIn('cpu_times', names_to_metric_types) + self.assertEqual(names_to_metric_types['cpu_times'], 'rate') + + self.assertIn('memory_info', stats) + self.assertIn('cpu_times', stats) + + def test_send_single_metric(self): + check = load_check(self.CHECK_NAME, MOCK_CONFIG, AGENT_CONFIG_DEV_MODE) + check.gauge = mock.MagicMock() + check.rate = mock.MagicMock() + + check._send_single_metric('datadog.agent.collector.memory_info.vms', 16814081, 'gauge') + check.gauge.assert_called_with('datadog.agent.collector.memory_info.vms', 16814081) + + check._send_single_metric('datadog.agent.collector.memory_info.vms', 16814081, 'rate') + check.rate.assert_called_with('datadog.agent.collector.memory_info.vms', 16814081) + + self.assertRaises(Exception, check._send_single_metric, + *('datadog.agent.collector.memory_info.vms', 16814081, 'bogus')) + + def test_register_psutil_metrics(self): + check = load_check(self.CHECK_NAME, MOCK_CONFIG, AGENT_CONFIG_DEV_MODE) + check._register_psutil_metrics(MOCK_STATS, MOCK_NAMES_TO_METRIC_TYPES) + self.metrics = check.get_metrics() + + self.assertMetric('datadog.agent.collector.memory_info.rss', value=16814080) + self.assertMetric('datadog.agent.collector.memory_info.vms', value=74522624) + + def test_bad_process_metric_check(self): + ''' Tests that a bad configuration option for `process_metrics` gets ignored ''' + check = load_check(self.CHECK_NAME, MOCK_CONFIG_2, AGENT_CONFIG_DEV_MODE) + instance = MOCK_CONFIG.get('instances')[0] + stats, names_to_metric_types = check._psutil_config_to_stats(instance) + + self.assertIn('memory_info', names_to_metric_types) + self.assertEqual(names_to_metric_types['memory_info'], 'gauge') + + self.assertNotIn('non_existent_stat', names_to_metric_types) + + self.assertIn('memory_info', stats) + self.assertNotIn('non_existent_stat', stats) + + ### Tests for Agent Default Mode + def test_no_process_metrics_collected(self): + ''' Test that additional process metrics are not collected when in default mode ''' + mocks = { + '_register_psutil_metrics': mock.MagicMock(side_effect=AssertionError), + '_psutil_config_to_stats': mock.MagicMock(side_effect=AssertionError), + } + + self.run_check(MOCK_CONFIG, mocks=mocks) + + +if __name__ == '__main__': + import unittest; unittest.main() diff --git a/tests/core/test_config.py b/tests/core/test_config.py index 3c86d7f956..99dc22c4a9 100644 --- a/tests/core/test_config.py +++ b/tests/core/test_config.py @@ -6,11 +6,13 @@ import unittest # project -from config import get_config, load_check_directory, DEFAULT_CHECKS +from config import get_config, load_check_directory from util import is_valid_hostname, windows_friendly_colon_split from utils.pidfile import PidFile from utils.platform import Platform +# No more hardcoded default checks +DEFAULT_CHECKS = [] class TestConfig(unittest.TestCase): def testWhiteSpaceConfig(self): diff --git a/utils/profile.py b/utils/profile.py new file mode 100644 index 0000000000..6c58f909bf --- /dev/null +++ b/utils/profile.py @@ -0,0 +1,101 @@ +import os +import logging +import cProfile +import pstats +from cStringIO import StringIO + +log = logging.getLogger('collector') + +class AgentProfiler(object): + PSTATS_LIMIT = 20 + DUMP_TO_FILE = True + STATS_DUMP_FILE = './collector-stats.dmp' + + def __init__(self): + self._enabled = False + self._profiler = None + + def enable_profiling(self): + """ + Enable the profiler + """ + if not self._profiler: + self._profiler = cProfile.Profile() + + self._profiler.enable() + log.debug("Agent profiling is enabled") + + def disable_profiling(self): + """ + Disable the profiler, and if necessary dump a truncated pstats output + """ + self._profiler.disable() + s = StringIO() + ps = pstats.Stats(self._profiler, stream=s).sort_stats("cumulative") + ps.print_stats(self.PSTATS_LIMIT) + log.debug(s.getvalue()) + log.debug("Agent profiling is disabled") + if self.DUMP_TO_FILE: + log.debug("Pstats dumps are enabled. Dumping pstats output to {0}" + .format(self.STATS_DUMP_FILE)) + ps.dump_stats(self.STATS_DUMP_FILE) + + @staticmethod + def wrap_profiling(func): + """ + Wraps the function call in a cProfile run, processing and logging the output with pstats.Stats + Useful for profiling individual checks. + + :param func: The function to profile + """ + def wrapped_func(*args, **kwargs): + try: + import cProfile + profiler = cProfile.Profile() + profiler.enable() + log.debug("Agent profiling is enabled") + except Exception: + log.warn("Cannot enable profiler") + + # Catch any return value before disabling profiler + ret_val = func(*args, **kwargs) + + # disable profiler and printout stats to stdout + try: + profiler.disable() + import pstats + from cStringIO import StringIO + s = StringIO() + ps = pstats.Stats(profiler, stream=s).sort_stats("cumulative") + ps.print_stats(AgentProfiler.PSTATS_LIMIT) + log.info(s.getvalue()) + except Exception: + log.warn("Cannot disable profiler") + + return ret_val + + return wrapped_func + +def pretty_statistics(stats): + #FIXME: This should really be clever enough to handle more varied statistics + # Right now memory_info is the only one that we will predictably have 'before' and 'after' + # details about + + before = stats.get('before') + after = stats.get('after') + + mem_before = before.get('memory_info') + mem_after = after.get('memory_info') + + if mem_before and mem_after: + return """ + Memory Before (RSS): {0} + Memory After (RSS): {1} + Difference (RSS): {2} + Memory Before (VMS): {3} + Memory After (VMS): {4} + Difference (VMS): {5} + """.format(mem_before['rss'], mem_after['rss'], mem_after['rss'] - mem_before['rss'], + mem_before['vms'], mem_after['vms'], mem_after['vms'] - mem_before['vms']) + else: + return "" diff --git a/win32/agent.py b/win32/agent.py index 3e8f157060..c03c312701 100644 --- a/win32/agent.py +++ b/win32/agent.py @@ -52,7 +52,8 @@ def __init__(self, args): 'autorestart': False, 'dd_url': None, 'use_forwarder': True, - 'disabled_dd': False + 'disabled_dd': False, + 'profile': False }), [] agentConfig = get_config(parse_args=False, options=opts) self.hostname = get_hostname(agentConfig)