Skip to content

Commit

Permalink
Major Logging Overhaul
Browse files Browse the repository at this point in the history
in this commit:

- centralize logging output handler.
- set global Info/Debug/Error log levels based on config file or flags.
- remove per-plugin debug arg handling.
- add a I!, D!, or E! to every log message.
- add configuration option to specify where to send logs.

closes #1786
  • Loading branch information
sparrc committed Oct 3, 2016
1 parent 78ced6b commit 75c2a2b
Show file tree
Hide file tree
Showing 51 changed files with 368 additions and 273 deletions.
6 changes: 3 additions & 3 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (ac *accumulator) makeMetric(
// NaNs are invalid values in influxdb, skip measurement
if math.IsNaN(val) || math.IsInf(val, 0) {
if ac.debug {
log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+
log.Printf("I! Measurement [%s] field [%s] has a NaN or Inf "+
"field, skipping",
measurement, k)
}
Expand Down Expand Up @@ -163,7 +163,7 @@ func (ac *accumulator) makeMetric(
m, err = telegraf.NewMetric(measurement, tags, fields, timestamp)
}
if err != nil {
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
log.Printf("E! Error adding point [%s]: %s\n", measurement, err.Error())
return nil
}

Expand All @@ -182,7 +182,7 @@ func (ac *accumulator) AddError(err error) {
}
atomic.AddUint64(&ac.errCount, 1)
//TODO suppress/throttle consecutive duplicate errors?
log.Printf("ERROR in input [%s]: %s", ac.inputConfig.Name, err)
log.Printf("E! Error in input [%s]: %s", ac.inputConfig.Name, err)
}

func (ac *accumulator) Debug() bool {
Expand Down
42 changes: 17 additions & 25 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,24 @@ func (a *Agent) Connect() error {
switch ot := o.Output.(type) {
case telegraf.ServiceOutput:
if err := ot.Start(); err != nil {
log.Printf("Service for output %s failed to start, exiting\n%s\n",
log.Printf("E! Service for output %s failed to start, exiting\n%s\n",
o.Name, err.Error())
return err
}
}

if a.Config.Agent.Debug {
log.Printf("Attempting connection to output: %s\n", o.Name)
}
log.Printf("D! Attempting connection to output: %s\n", o.Name)
err := o.Output.Connect()
if err != nil {
log.Printf("Failed to connect to output %s, retrying in 15s, "+
log.Printf("E! Failed to connect to output %s, retrying in 15s, "+
"error was '%s' \n", o.Name, err)
time.Sleep(15 * time.Second)
err = o.Output.Connect()
if err != nil {
return err
}
}
if a.Config.Agent.Debug {
log.Printf("Successfully connected to output: %s\n", o.Name)
}
log.Printf("D! Successfully connected to output: %s\n", o.Name)
}
return nil
}
Expand All @@ -92,9 +88,9 @@ func panicRecover(input *models.RunningInput) {
if err := recover(); err != nil {
trace := make([]byte, 2048)
runtime.Stack(trace, true)
log.Printf("FATAL: Input [%s] panicked: %s, Stack:\n%s\n",
log.Printf("E! FATAL: Input [%s] panicked: %s, Stack:\n%s\n",
input.Name, err, trace)
log.Println("PLEASE REPORT THIS PANIC ON GITHUB with " +
log.Println("E! PLEASE REPORT THIS PANIC ON GITHUB with " +
"stack trace, configuration, and OS information: " +
"https://github.com/influxdata/telegraf/issues/new")
}
Expand All @@ -117,7 +113,6 @@ func (a *Agent) gatherer(
var outerr error

acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(a.Config.Agent.Debug)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
acc.setDefaultTags(a.Config.Tags)
Expand All @@ -131,10 +126,8 @@ func (a *Agent) gatherer(
if outerr != nil {
return outerr
}
if a.Config.Agent.Debug {
log.Printf("Input [%s] gathered metrics, (%s interval) in %s\n",
input.Name, interval, elapsed)
}
log.Printf("D! Input [%s] gathered metrics, (%s interval) in %s\n",
input.Name, interval, elapsed)

select {
case <-shutdown:
Expand Down Expand Up @@ -167,11 +160,11 @@ func gatherWithTimeout(
select {
case err := <-done:
if err != nil {
log.Printf("ERROR in input [%s]: %s", input.Name, err)
log.Printf("E! ERROR in input [%s]: %s", input.Name, err)
}
return
case <-ticker.C:
log.Printf("ERROR: input [%s] took longer to collect than "+
log.Printf("E! ERROR: input [%s] took longer to collect than "+
"collection interval (%s)",
input.Name, timeout)
continue
Expand Down Expand Up @@ -244,7 +237,7 @@ func (a *Agent) flush() {
defer wg.Done()
err := output.Write()
if err != nil {
log.Printf("Error writing to output [%s]: %s\n",
log.Printf("E! Error writing to output [%s]: %s\n",
output.Name, err.Error())
}
}(o)
Expand All @@ -264,7 +257,7 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
for {
select {
case <-shutdown:
log.Println("Hang on, flushing any cached metrics before shutdown")
log.Println("I! Hang on, flushing any cached metrics before shutdown")
a.flush()
return nil
case <-ticker.C:
Expand Down Expand Up @@ -302,9 +295,9 @@ func copyMetric(m telegraf.Metric) telegraf.Metric {
func (a *Agent) Run(shutdown chan struct{}) error {
var wg sync.WaitGroup

log.Printf("Agent Config: Interval:%s, Debug:%#v, Quiet:%#v, Hostname:%#v, "+
log.Printf("I! Agent Config: Interval:%s, Quiet:%#v, Hostname:%#v, "+
"Flush Interval:%s \n",
a.Config.Agent.Interval.Duration, a.Config.Agent.Debug, a.Config.Agent.Quiet,
a.Config.Agent.Interval.Duration, a.Config.Agent.Quiet,
a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration)

// channel shared between all input threads for accumulating metrics
Expand All @@ -315,13 +308,12 @@ func (a *Agent) Run(shutdown chan struct{}) error {
switch p := input.Input.(type) {
case telegraf.ServiceInput:
acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(a.Config.Agent.Debug)
// Service input plugins should set their own precision of their
// metrics.
acc.DisablePrecision()
acc.setDefaultTags(a.Config.Tags)
if err := p.Start(acc); err != nil {
log.Printf("Service for input %s failed to start, exiting\n%s\n",
log.Printf("E! Service for input %s failed to start, exiting\n%s\n",
input.Name, err.Error())
return err
}
Expand All @@ -339,7 +331,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
go func() {
defer wg.Done()
if err := a.flusher(shutdown, metricC); err != nil {
log.Printf("Flusher routine failed, exiting: %s\n", err.Error())
log.Printf("E! Flusher routine failed, exiting: %s\n", err.Error())
close(shutdown)
}
}()
Expand All @@ -354,7 +346,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
go func(in *models.RunningInput, interv time.Duration) {
defer wg.Done()
if err := a.gatherer(shutdown, in, interv, metricC); err != nil {
log.Printf(err.Error())
log.Printf("E! " + err.Error())
}
}(input, interval)
}
Expand Down
44 changes: 20 additions & 24 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ import (

"github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/internal/config"
"github.com/influxdata/telegraf/logger"
"github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
"github.com/influxdata/telegraf/plugins/outputs"
_ "github.com/influxdata/telegraf/plugins/outputs/all"

"github.com/kardianos/service"
)

var fDebug = flag.Bool("debug", false,
"show metrics as they're generated to stdout")
"turn on debug logging")
var fQuiet = flag.Bool("quiet", false,
"run in quiet mode")
var fTest = flag.Bool("test", false, "gather metrics, print them out, and exit")
Expand Down Expand Up @@ -109,12 +111,11 @@ Examples:
telegraf -config telegraf.conf -input-filter cpu:mem -output-filter influxdb
`

var logger service.Logger
// var logger service.Logger

var stop chan struct{}

var srvc service.Service
var svcConfig *service.Config

type program struct{}

Expand Down Expand Up @@ -212,13 +213,12 @@ func reloadLoop(stop chan struct{}, s service.Service) {
log.Fatal(err)
}

if *fDebug {
ag.Config.Agent.Debug = true
}

if *fQuiet {
ag.Config.Agent.Quiet = true
}
// Setup logging
logger.SetupLogging(
ag.Config.Agent.Debug || *fDebug,
ag.Config.Agent.Quiet || *fQuiet,
ag.Config.Agent.Logfile,
)

if *fTest {
err = ag.Test()
Expand All @@ -243,7 +243,7 @@ func reloadLoop(stop chan struct{}, s service.Service) {
close(shutdown)
}
if sig == syscall.SIGHUP {
log.Printf("Reloading Telegraf config\n")
log.Printf("I! Reloading Telegraf config\n")
<-reload
reload <- true
close(shutdown)
Expand All @@ -253,10 +253,10 @@ func reloadLoop(stop chan struct{}, s service.Service) {
}
}()

log.Printf("Starting Telegraf (version %s)\n", version)
log.Printf("Loaded outputs: %s", strings.Join(c.OutputNames(), " "))
log.Printf("Loaded inputs: %s", strings.Join(c.InputNames(), " "))
log.Printf("Tags enabled: %s", c.ListTags())
log.Printf("I! Starting Telegraf (version %s)\n", version)
log.Printf("I! Loaded outputs: %s", strings.Join(c.OutputNames(), " "))
log.Printf("I! Loaded inputs: %s", strings.Join(c.InputNames(), " "))
log.Printf("I! Tags enabled: %s", c.ListTags())

if *fPidfile != "" {
f, err := os.Create(*fPidfile)
Expand Down Expand Up @@ -293,8 +293,9 @@ func (p *program) Stop(s service.Service) error {
}

func main() {
flag.Parse()
if runtime.GOOS == "windows" {
svcConfig = &service.Config{
svcConfig := &service.Config{
Name: "telegraf",
DisplayName: "Telegraf Data Collector Service",
Description: "Collects data using a series of plugins and publishes it to" +
Expand All @@ -307,13 +308,8 @@ func main() {
if err != nil {
log.Fatal(err)
}
logger, err = s.Logger(nil)
if err != nil {
log.Fatal(err)
}
// Handle the -service flag here to prevent any issues with tooling that may not have an interactive
// session, e.g. installing from Ansible
flag.Parse()
// Handle the -service flag here to prevent any issues with tooling that
// may not have an interactive session, e.g. installing from Ansible.
if *fService != "" {
if *fConfig != "" {
(*svcConfig).Arguments = []string{"-config", *fConfig}
Expand All @@ -325,7 +321,7 @@ func main() {
} else {
err = s.Run()
if err != nil {
logger.Error(err)
log.Println("E! " + err.Error())
}
}
} else {
Expand Down
44 changes: 27 additions & 17 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true

## Telegraf will send metrics to outputs in batches of at
## most metric_batch_size metrics.
## Telegraf will send metrics to outputs in batches of at most
## metric_batch_size metrics.
## This controls the size of writes that Telegraf sends to output plugins.
metric_batch_size = 1000

## For failed writes, telegraf will cache metric_buffer_limit metrics for each
## output, and will flush this buffer on a successful write. Oldest metrics
## are dropped first when this buffer fills.
## This buffer only fills when writes fail to output plugin(s).
metric_buffer_limit = 10000

## Collection jitter is used to jitter the collection by a random amount.
Expand All @@ -57,10 +60,15 @@
## Precision will NOT be used for service inputs, such as logparser and statsd.
## Valid values are "ns", "us" (or "µs"), "ms", "s".
precision = ""
## Run telegraf in debug mode

## Logging configuration:
## Run telegraf with debug log messages.
debug = false
## Run telegraf in quiet mode
## Run telegraf in quiet mode (error log messages only).
quiet = false
## Specify the log file name. The empty string means to log to stdout.
logfile = ""

## Override default hostname, if empty use os.Hostname()
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
Expand Down Expand Up @@ -1064,8 +1072,6 @@
# # "tasks",
# # "messages",
# # ]
# ## Include mesos tasks statistics, default is false
# # slave_tasks = true


# # Read metrics from one or many MongoDB servers
Expand Down Expand Up @@ -1442,25 +1448,29 @@
# # Retrieves SNMP values from remote agents
# [[inputs.snmp]]
# agents = [ "127.0.0.1:161" ]
# ## Timeout for each SNMP query.
# timeout = "5s"
# ## Number of retries to attempt within timeout.
# retries = 3
# ## SNMP version, values can be 1, 2, or 3
# version = 2
#
# # SNMPv1 & SNMPv2 parameters
# ## SNMP community string.
# community = "public"
#
# # SNMPv2 & SNMPv3 parameters
# max_repetitions = 50
# ## The GETBULK max-repetitions parameter
# max_repetitions = 10
#
# # SNMPv3 parameters
# ## SNMPv3 auth parameters
# #sec_name = "myuser"
# #auth_protocol = "md5" # Values: "MD5", "SHA", ""
# #auth_password = "password123"
# #sec_level = "authNoPriv" # Values: "noAuthNoPriv", "authNoPriv", "authPriv"
# #auth_protocol = "md5" # Values: "MD5", "SHA", ""
# #auth_password = "pass"
# #sec_level = "authNoPriv" # Values: "noAuthNoPriv", "authNoPriv", "authPriv"
# #context_name = ""
# #priv_protocol = "" # Values: "DES", "AES", ""
# #priv_protocol = "" # Values: "DES", "AES", ""
# #priv_password = ""
#
# # measurement name
# ## measurement name
# name = "system"
# [[inputs.snmp.field]]
# name = "hostname"
Expand All @@ -1475,7 +1485,7 @@
# oid = "HOST-RESOURCES-MIB::hrMemorySize"
#
# [[inputs.snmp.table]]
# # measurement name
# ## measurement name
# name = "remote_servers"
# inherit_tags = [ "hostname" ]
# [[inputs.snmp.table.field]]
Expand All @@ -1490,7 +1500,7 @@
# oid = ".1.0.0.0.1.2"
#
# [[inputs.snmp.table]]
# # auto populate table's fields using the MIB
# ## auto populate table's fields using the MIB
# oid = "HOST-RESOURCES-MIB::hrNetworkTable"


Expand Down
Loading

0 comments on commit 75c2a2b

Please sign in to comment.