Skip to content

Commit

Permalink
[dev] Add an Agent Developer Mode for collecting fine-grained perform…
Browse files Browse the repository at this point in the history
…ance metrics
  • Loading branch information
talwai committed May 14, 2015
1 parent 36583cc commit 915ab28
Show file tree
Hide file tree
Showing 12 changed files with 742 additions and 66 deletions.
75 changes: 40 additions & 35 deletions agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
# Custom modules
from checks.collector import Collector
from checks.check_status import CollectorStatus
from utils.profile import pretty_statistics
from config import (
get_confd_path,
get_config,
Expand All @@ -42,6 +43,7 @@
)
from utils.pidfile import PidFile
from utils.flare import configcheck, Flare
from utils.profile import AgentProfiler

# Constants
PID_NAME = "dd-agent"
Expand All @@ -50,6 +52,8 @@
START_COMMANDS = ['start', 'restart', 'foreground']
DD_AGENT_COMMANDS = ['check', 'flare', 'jmx']

DEFAULT_COLLECTOR_PROFILE_INTERVAL = 20

# Globals
log = logging.getLogger('collector')

Expand All @@ -59,11 +63,12 @@ class Agent(Daemon):
The agent class is a daemon that runs the collector in a background process.
"""

def __init__(self, pidfile, autorestart, start_event=True):
def __init__(self, pidfile, autorestart, start_event=True, in_developer_mode=False):
Daemon.__init__(self, pidfile, autorestart=autorestart)
self.run_forever = True
self.collector = None
self.start_event = start_event
self.in_developer_mode = in_developer_mode

def _handle_sigterm(self, signum, frame):
log.debug("Caught sigterm. Stopping run loop.")
Expand Down Expand Up @@ -110,6 +115,10 @@ def run(self, config=None):

self.collector = Collector(agentConfig, emitters, systemStats, hostname)

# In developer mode, the number of runs to be included in a single collector profile
collector_profile_interval = agentConfig.get('collector_profile_interval',
DEFAULT_COLLECTOR_PROFILE_INTERVAL)

# Configure the watchdog.
check_frequency = int(agentConfig['check_freq'])
watchdog = self._get_watchdog(check_frequency, agentConfig)
Expand All @@ -118,36 +127,30 @@ def run(self, config=None):
self.restart_interval = int(agentConfig.get('restart_interval', RESTART_INTERVAL))
self.agent_start = time.time()

profiled = False
collector_profiled_runs = 0

# Run the main loop.
while self.run_forever:

# enable profiler if needed
profiled = False
if agentConfig.get('profile', False) and agentConfig.get('profile').lower() == 'yes':
# Setup profiling if necessary
if self.in_developer_mode and not profiled:
try:
import cProfile
profiler = cProfile.Profile()
profiler = AgentProfiler()
profiler.enable_profiling()
profiled = True
profiler.enable()
log.debug("Agent profiling is enabled")
except Exception:
log.warn("Cannot enable profiler")
except Exception as e:
log.warn("Cannot enable profiler: %s" % str(e))

# Do the work.
self.collector.run(checksd=checksd, start_event=self.start_event)

# disable profiler and printout stats to stdout
if agentConfig.get('profile', False) and agentConfig.get('profile').lower() == 'yes' and profiled:
try:
profiler.disable()
import pstats
from cStringIO import StringIO
s = StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats("cumulative")
ps.print_stats()
log.debug(s.getvalue())
except Exception:
log.warn("Cannot disable profiler")
if profiled:
if collector_profiled_runs >= collector_profile_interval:
try:
profiler.disable_profiling()
profiled = False
collector_profiled_runs = 0
except Exception as e:
log.warn("Cannot disable profiler: %s" % str(e))

# Check if we should restart.
if self.autorestart and self._should_restart():
Expand All @@ -158,8 +161,9 @@ def run(self, config=None):
if self.run_forever:
if watchdog:
watchdog.reset()
if profiled:
collector_profiled_runs += 1
time.sleep(check_frequency)

# Now clean-up.
try:
CollectorStatus.remove_latest_status()
Expand Down Expand Up @@ -212,7 +216,7 @@ def main():
agentConfig = get_config(options=options)
autorestart = agentConfig.get('autorestart', False)
hostname = get_hostname(agentConfig)

in_developer_mode = agentConfig.get('developer_mode')
COMMANDS_AGENT = [
'start',
'stop',
Expand Down Expand Up @@ -247,7 +251,7 @@ def main():
deprecate_old_command_line_tools()

if command in COMMANDS_AGENT:
agent = Agent(PidFile('dd-agent').get_path(), autorestart)
agent = Agent(PidFile('dd-agent').get_path(), autorestart, in_developer_mode=in_developer_mode)

if command in START_COMMANDS:
log.info('Agent version %s' % get_version())
Expand Down Expand Up @@ -301,17 +305,18 @@ def parent_func(): agent.start_event = False
checks = load_check_directory(agentConfig, hostname)
for check in checks['initialized_checks']:
if check.name == check_name:
check.run()
print check.get_metrics()
print check.get_events()
print check.get_service_checks()
if in_developer_mode:
check.run = AgentProfiler.wrap_profiling(check.run)

cs = Collector.run_single_check(check, verbose=True)
print CollectorStatus.render_check_status(cs)

if len(args) == 3 and args[2] == 'check_rate':
print "Running 2nd iteration to capture rate metrics"
time.sleep(1)
check.run()
print check.get_metrics()
print check.get_events()
print check.get_service_checks()
cs = Collector.run_single_check(check, verbose=True)
print CollectorStatus.render_check_status(cs)

check.stop()

elif 'configcheck' == command or 'configtest' == command:
Expand Down
151 changes: 151 additions & 0 deletions checks.d/agent_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import os
import threading

# 3p
try:
import psutil
except ImportError:
psutil = None

# project
from checks import AgentCheck
from checks.metric_types import MetricTypes
from config import _is_affirmative

MAX_THREADS_COUNT = 50
MAX_COLLECTION_TIME = 30
MAX_EMIT_TIME = 5
MAX_CPU_PCT = 10


class UnsupportedMetricType(Exception):
"""
Raised by :class:`AgentMetrics` when a metric type outside outside of AgentMetrics.ALLOWED_METRIC_TYPES
is requested for measurement of a particular statistic
"""
def __init__(self, metric_name, metric_type):
message = 'Unsupported Metric Type for {0} : {1}'.format(metric_name, metric_type)
Exception.__init__(self, message)

class AgentMetrics(AgentCheck):
"""
New-style version of `CollectorMetrics`
Gets information about agent performance on every collector loop
"""

def __init__(self, *args, **kwargs):
AgentCheck.__init__(self, *args, **kwargs)
self._collector_payload = {}
self._metric_context = {}

def _psutil_config_to_stats(self, instance):
"""
Reads `init_config` for `psutil` methods to call on the current process
Calls those methods and stores the raw output
:returns a dictionary of statistic_name: value
"""
process_metrics = instance.get('process_metrics', self.init_config.get('process_metrics', None))
if not process_metrics:
self.log.error('No metrics configured for AgentMetrics check!')
return {}

methods, metric_types = zip(
*[(p['name'], p.get('type', MetricTypes.GAUGE))
for p in process_metrics if _is_affirmative(p.get('active'))]
)

names_to_metric_types = {}
for i, m in enumerate(methods):
names_to_metric_types[AgentCheck._get_statistic_name_from_method(m)] = metric_types[i]

stats = AgentCheck._collect_internal_stats(methods)
return stats, names_to_metric_types

def _send_single_metric(self, metric_name, metric_value, metric_type):
if metric_type == MetricTypes.GAUGE:
self.gauge(metric_name, metric_value)
elif metric_type == MetricTypes.RATE:
self.rate(metric_name, metric_value)
else:
raise UnsupportedMetricType(metric_name, metric_type)

def _register_psutil_metrics(self, stats, names_to_metric_types):
"""
Saves sample metrics from psutil
:param stats: a dictionary that looks like:
{
'memory_info': OrderedDict([('rss', 24395776), ('vms', 144666624)]),
'io_counters': OrderedDict([('read_count', 4536),
('write_count', 100),
('read_bytes', 0),
('write_bytes', 61440)])
...
}
This creates a metric like `datadog.agent.collector.{key_1}.{key_2}` where key_1 is a top-level
key in `stats`, and key_2 is a nested key.
E.g. datadog.agent.collector.memory_info.rss
"""

base_metric = 'datadog.agent.collector.{0}.{1}'
#TODO: May have to call self.normalize(metric_name) to get a compliant name
for k, v in stats.iteritems():
metric_type = names_to_metric_types[k]
if isinstance(v, dict):
for _k, _v in v.iteritems():
full_metric_name = base_metric.format(k, _k)
self._send_single_metric(full_metric_name, _v, metric_type)
else:
full_metric_name = 'datadog.agent.collector.{0}'.format(k)
self._send_single_metric(full_metric_name, v, metric_type)

def set_metric_context(self, payload, context):
self._collector_payload = payload
self._metric_context = context

def get_metric_context(self):
return self._collector_payload, self._metric_context

def check(self, instance):
if self.in_developer_mode:
stats, names_to_metric_types = self._psutil_config_to_stats(instance)
self._register_psutil_metrics(stats, names_to_metric_types)

payload, context = self.get_metric_context()
collection_time = context.get('collection_time', None)
emit_time = context.get('emit_time', None)
cpu_time = context.get('cpu_time', None)

if threading.activeCount() > MAX_THREADS_COUNT:
self.gauge('datadog.agent.collector.threads.count', threading.activeCount())
self.log.info("Thread count is high: %d" % threading.activeCount())

collect_time_exceeds_threshold = collection_time > MAX_COLLECTION_TIME
if collection_time is not None and \
(collect_time_exceeds_threshold or self.in_developer_mode):

self.gauge('datadog.agent.collector.collection.time', collection_time)
if collect_time_exceeds_threshold:
self.log.info("Collection time (s) is high: %.1f, metrics count: %d, events count: %d"
% (collection_time, len(payload['metrics']), len(payload['events'])))

emit_time_exceeds_threshold = emit_time > MAX_EMIT_TIME
if emit_time is not None and \
(emit_time_exceeds_threshold or self.in_developer_mode):
self.gauge('datadog.agent.emitter.emit.time', emit_time)
if emit_time_exceeds_threshold:
self.log.info("Emit time (s) is high: %.1f, metrics count: %d, events count: %d"
% (emit_time, len(payload['metrics']), len(payload['events'])))

if cpu_time is not None:
try:
cpu_used_pct = 100.0 * float(cpu_time)/float(collection_time)
if cpu_used_pct > MAX_CPU_PCT:
self.gauge('datadog.agent.collector.cpu.used', cpu_used_pct)
self.log.info("CPU consumed (%%) is high: %.1f, metrics count: %d, events count: %d"
% (cpu_used_pct, len(payload['metrics']), len(payload['events'])))
except Exception, e:
self.log.debug("Couldn't compute cpu used by collector with values %s %s %s"
% (cpu_time, collection_time, str(e)))
Loading

0 comments on commit 915ab28

Please sign in to comment.