Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Talwai/agent dev mode #1577

Merged
merged 1 commit into from
May 14, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 40 additions & 35 deletions agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
# Custom modules
from checks.collector import Collector
from checks.check_status import CollectorStatus
from utils.profile import pretty_statistics
from config import (
get_confd_path,
get_config,
Expand All @@ -42,6 +43,7 @@
)
from utils.pidfile import PidFile
from utils.flare import configcheck, Flare
from utils.profile import AgentProfiler

# Constants
PID_NAME = "dd-agent"
Expand All @@ -50,6 +52,8 @@
START_COMMANDS = ['start', 'restart', 'foreground']
DD_AGENT_COMMANDS = ['check', 'flare', 'jmx']

DEFAULT_COLLECTOR_PROFILE_INTERVAL = 20

# Globals
log = logging.getLogger('collector')

Expand All @@ -59,11 +63,12 @@ class Agent(Daemon):
The agent class is a daemon that runs the collector in a background process.
"""

def __init__(self, pidfile, autorestart, start_event=True):
def __init__(self, pidfile, autorestart, start_event=True, in_developer_mode=False):
Daemon.__init__(self, pidfile, autorestart=autorestart)
self.run_forever = True
self.collector = None
self.start_event = start_event
self.in_developer_mode = in_developer_mode

def _handle_sigterm(self, signum, frame):
log.debug("Caught sigterm. Stopping run loop.")
Expand Down Expand Up @@ -110,6 +115,10 @@ def run(self, config=None):

self.collector = Collector(agentConfig, emitters, systemStats, hostname)

# In developer mode, the number of runs to be included in a single collector profile
collector_profile_interval = agentConfig.get('collector_profile_interval',
DEFAULT_COLLECTOR_PROFILE_INTERVAL)

# Configure the watchdog.
check_frequency = int(agentConfig['check_freq'])
watchdog = self._get_watchdog(check_frequency, agentConfig)
Expand All @@ -118,36 +127,30 @@ def run(self, config=None):
self.restart_interval = int(agentConfig.get('restart_interval', RESTART_INTERVAL))
self.agent_start = time.time()

profiled = False
collector_profiled_runs = 0

# Run the main loop.
while self.run_forever:

# enable profiler if needed
profiled = False
if agentConfig.get('profile', False) and agentConfig.get('profile').lower() == 'yes':
# Setup profiling if necessary
if self.in_developer_mode and not profiled:
try:
import cProfile
profiler = cProfile.Profile()
profiler = AgentProfiler()
profiler.enable_profiling()
profiled = True
profiler.enable()
log.debug("Agent profiling is enabled")
except Exception:
log.warn("Cannot enable profiler")
except Exception as e:
log.warn("Cannot enable profiler: %s" % str(e))

# Do the work.
self.collector.run(checksd=checksd, start_event=self.start_event)

# disable profiler and printout stats to stdout
if agentConfig.get('profile', False) and agentConfig.get('profile').lower() == 'yes' and profiled:
try:
profiler.disable()
import pstats
from cStringIO import StringIO
s = StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats("cumulative")
ps.print_stats()
log.debug(s.getvalue())
except Exception:
log.warn("Cannot disable profiler")
if profiled:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this code to the new module ?

if collector_profiled_runs >= collector_profile_interval:
try:
profiler.disable_profiling()
profiled = False
collector_profiled_runs = 0
except Exception as e:
log.warn("Cannot disable profiler: %s" % str(e))

# Check if we should restart.
if self.autorestart and self._should_restart():
Expand All @@ -158,8 +161,9 @@ def run(self, config=None):
if self.run_forever:
if watchdog:
watchdog.reset()
if profiled:
collector_profiled_runs += 1
time.sleep(check_frequency)

# Now clean-up.
try:
CollectorStatus.remove_latest_status()
Expand Down Expand Up @@ -212,7 +216,7 @@ def main():
agentConfig = get_config(options=options)
autorestart = agentConfig.get('autorestart', False)
hostname = get_hostname(agentConfig)

in_developer_mode = agentConfig.get('developer_mode')
COMMANDS_AGENT = [
'start',
'stop',
Expand Down Expand Up @@ -247,7 +251,7 @@ def main():
deprecate_old_command_line_tools()

if command in COMMANDS_AGENT:
agent = Agent(PidFile('dd-agent').get_path(), autorestart)
agent = Agent(PidFile('dd-agent').get_path(), autorestart, in_developer_mode=in_developer_mode)

if command in START_COMMANDS:
log.info('Agent version %s' % get_version())
Expand Down Expand Up @@ -301,17 +305,18 @@ def parent_func(): agent.start_event = False
checks = load_check_directory(agentConfig, hostname)
for check in checks['initialized_checks']:
if check.name == check_name:
check.run()
print check.get_metrics()
print check.get_events()
print check.get_service_checks()
if in_developer_mode:
check.run = AgentProfiler.wrap_profiling(check.run)

cs = Collector.run_single_check(check, verbose=True)
print CollectorStatus.render_check_status(cs)

if len(args) == 3 and args[2] == 'check_rate':
print "Running 2nd iteration to capture rate metrics"
time.sleep(1)
check.run()
print check.get_metrics()
print check.get_events()
print check.get_service_checks()
cs = Collector.run_single_check(check, verbose=True)
print CollectorStatus.render_check_status(cs)

check.stop()

elif 'configcheck' == command or 'configtest' == command:
Expand Down
151 changes: 151 additions & 0 deletions checks.d/agent_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import os
import threading

# 3p
try:
import psutil
except ImportError:
psutil = None

# project
from checks import AgentCheck
from checks.metric_types import MetricTypes
from config import _is_affirmative
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be ordered #stdlib #3p #project


MAX_THREADS_COUNT = 50
MAX_COLLECTION_TIME = 30
MAX_EMIT_TIME = 5
MAX_CPU_PCT = 10


class UnsupportedMetricType(Exception):
"""
Raised by :class:`AgentMetrics` when a metric type outside outside of AgentMetrics.ALLOWED_METRIC_TYPES
is requested for measurement of a particular statistic
"""
def __init__(self, metric_name, metric_type):
message = 'Unsupported Metric Type for {0} : {1}'.format(metric_name, metric_type)
Exception.__init__(self, message)

class AgentMetrics(AgentCheck):
"""
New-style version of `CollectorMetrics`
Gets information about agent performance on every collector loop
"""

def __init__(self, *args, **kwargs):
AgentCheck.__init__(self, *args, **kwargs)
self._collector_payload = {}
self._metric_context = {}

def _psutil_config_to_stats(self, instance):
"""
Reads `init_config` for `psutil` methods to call on the current process
Calls those methods and stores the raw output

:returns a dictionary of statistic_name: value
"""
process_metrics = instance.get('process_metrics', self.init_config.get('process_metrics', None))
if not process_metrics:
self.log.error('No metrics configured for AgentMetrics check!')
return {}

methods, metric_types = zip(
*[(p['name'], p.get('type', MetricTypes.GAUGE))
for p in process_metrics if _is_affirmative(p.get('active'))]
)

names_to_metric_types = {}
for i, m in enumerate(methods):
names_to_metric_types[AgentCheck._get_statistic_name_from_method(m)] = metric_types[i]

stats = AgentCheck._collect_internal_stats(methods)
return stats, names_to_metric_types

def _send_single_metric(self, metric_name, metric_value, metric_type):
if metric_type == MetricTypes.GAUGE:
self.gauge(metric_name, metric_value)
elif metric_type == MetricTypes.RATE:
self.rate(metric_name, metric_value)
else:
raise UnsupportedMetricType(metric_name, metric_type)

def _register_psutil_metrics(self, stats, names_to_metric_types):
"""
Saves sample metrics from psutil

:param stats: a dictionary that looks like:
{
'memory_info': OrderedDict([('rss', 24395776), ('vms', 144666624)]),
'io_counters': OrderedDict([('read_count', 4536),
('write_count', 100),
('read_bytes', 0),
('write_bytes', 61440)])
...
}

This creates a metric like `datadog.agent.collector.{key_1}.{key_2}` where key_1 is a top-level
key in `stats`, and key_2 is a nested key.
E.g. datadog.agent.collector.memory_info.rss
"""

base_metric = 'datadog.agent.collector.{0}.{1}'
#TODO: May have to call self.normalize(metric_name) to get a compliant name
for k, v in stats.iteritems():
metric_type = names_to_metric_types[k]
if isinstance(v, dict):
for _k, _v in v.iteritems():
full_metric_name = base_metric.format(k, _k)
self._send_single_metric(full_metric_name, _v, metric_type)
else:
full_metric_name = 'datadog.agent.collector.{0}'.format(k)
self._send_single_metric(full_metric_name, v, metric_type)

def set_metric_context(self, payload, context):
self._collector_payload = payload
self._metric_context = context

def get_metric_context(self):
return self._collector_payload, self._metric_context

def check(self, instance):
if self.in_developer_mode:
stats, names_to_metric_types = self._psutil_config_to_stats(instance)
self._register_psutil_metrics(stats, names_to_metric_types)

payload, context = self.get_metric_context()
collection_time = context.get('collection_time', None)
emit_time = context.get('emit_time', None)
cpu_time = context.get('cpu_time', None)

if threading.activeCount() > MAX_THREADS_COUNT:
self.gauge('datadog.agent.collector.threads.count', threading.activeCount())
self.log.info("Thread count is high: %d" % threading.activeCount())

collect_time_exceeds_threshold = collection_time > MAX_COLLECTION_TIME
if collection_time is not None and \
(collect_time_exceeds_threshold or self.in_developer_mode):

self.gauge('datadog.agent.collector.collection.time', collection_time)
if collect_time_exceeds_threshold:
self.log.info("Collection time (s) is high: %.1f, metrics count: %d, events count: %d"
% (collection_time, len(payload['metrics']), len(payload['events'])))

emit_time_exceeds_threshold = emit_time > MAX_EMIT_TIME
if emit_time is not None and \
(emit_time_exceeds_threshold or self.in_developer_mode):
self.gauge('datadog.agent.emitter.emit.time', emit_time)
if emit_time_exceeds_threshold:
self.log.info("Emit time (s) is high: %.1f, metrics count: %d, events count: %d"
% (emit_time, len(payload['metrics']), len(payload['events'])))

if cpu_time is not None:
try:
cpu_used_pct = 100.0 * float(cpu_time)/float(collection_time)
if cpu_used_pct > MAX_CPU_PCT:
self.gauge('datadog.agent.collector.cpu.used', cpu_used_pct)
self.log.info("CPU consumed (%%) is high: %.1f, metrics count: %d, events count: %d"
% (cpu_used_pct, len(payload['metrics']), len(payload['events'])))
except Exception, e:
self.log.debug("Couldn't compute cpu used by collector with values %s %s %s"
% (cpu_time, collection_time, str(e)))
Loading