From 75c2a2bec38706a585ae18ac1a08d43bbf937f53 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Fri, 30 Sep 2016 22:37:56 +0100 Subject: [PATCH] Major Logging Overhaul in this commit: - centralize logging output handler. - set global Info/Debug/Error log levels based on config file or flags. - remove per-plugin debug arg handling. - add a I!, D!, or E! to every log message. - add configuration option to specify where to send logs. closes #1786 --- agent/accumulator.go | 6 +- agent/agent.go | 42 +++--- cmd/telegraf/telegraf.go | 44 +++--- etc/telegraf.conf | 44 +++--- etc/telegraf_windows.conf | 130 +++++++++++------- internal/config/config.go | 27 ++-- internal/internal.go | 2 +- internal/models/running_output.go | 4 +- logger/logger.go | 58 ++++++++ plugins/inputs/aerospike/aerospike.go | 4 +- plugins/inputs/cassandra/cassandra.go | 2 +- plugins/inputs/ceph/ceph.go | 6 +- plugins/inputs/conntrack/conntrack.go | 5 +- plugins/inputs/docker/docker.go | 2 +- plugins/inputs/http_listener/http_listener.go | 4 +- .../inputs/kafka_consumer/kafka_consumer.go | 10 +- plugins/inputs/logparser/grok/grok.go | 14 +- plugins/inputs/logparser/logparser.go | 6 +- plugins/inputs/mailchimp/chimp_api.go | 4 +- plugins/inputs/mesos/mesos.go | 4 +- plugins/inputs/mongodb/mongodb_server.go | 6 +- plugins/inputs/mqtt_consumer/mqtt_consumer.go | 8 +- plugins/inputs/nats_consumer/nats_consumer.go | 8 +- plugins/inputs/nsq_consumer/nsq_consumer.go | 2 +- plugins/inputs/ntpq/ntpq.go | 10 +- .../postgresql_extensible.go | 4 +- plugins/inputs/powerdns/powerdns.go | 2 +- plugins/inputs/procstat/procstat.go | 2 +- plugins/inputs/snmp_legacy/snmp_legacy.go | 10 +- plugins/inputs/statsd/statsd.go | 26 ++-- plugins/inputs/sysstat/sysstat.go | 2 +- plugins/inputs/system/processes.go | 6 +- plugins/inputs/tail/tail.go | 8 +- plugins/inputs/tcp_listener/tcp_listener.go | 14 +- plugins/inputs/udp_listener/udp_listener.go | 12 +- .../webhooks/filestack/filestack_webhooks.go | 2 +- .../inputs/webhooks/github/github_webhooks.go | 4 +- .../webhooks/mandrill/mandrill_webhooks.go | 2 +- .../webhooks/rollbar/rollbar_webhooks.go | 2 +- plugins/inputs/webhooks/webhooks.go | 6 +- plugins/outputs/amon/amon.go | 2 +- plugins/outputs/amqp/amqp.go | 6 +- plugins/outputs/cloudwatch/cloudwatch.go | 4 +- plugins/outputs/datadog/datadog.go | 2 +- plugins/outputs/graphite/graphite.go | 4 +- plugins/outputs/influxdb/influxdb.go | 6 +- plugins/outputs/instrumental/instrumental.go | 8 +- plugins/outputs/kinesis/kinesis.go | 16 +-- plugins/outputs/librato/librato.go | 31 ++--- plugins/outputs/opentsdb/opentsdb_http.go | 6 +- .../prometheus_client/prometheus_client.go | 2 +- 51 files changed, 368 insertions(+), 273 deletions(-) create mode 100644 logger/logger.go diff --git a/agent/accumulator.go b/agent/accumulator.go index a0d0461a4cef4..752e2b91ffe23 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -132,7 +132,7 @@ func (ac *accumulator) makeMetric( // NaNs are invalid values in influxdb, skip measurement if math.IsNaN(val) || math.IsInf(val, 0) { if ac.debug { - log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+ + log.Printf("I! Measurement [%s] field [%s] has a NaN or Inf "+ "field, skipping", measurement, k) } @@ -163,7 +163,7 @@ func (ac *accumulator) makeMetric( m, err = telegraf.NewMetric(measurement, tags, fields, timestamp) } if err != nil { - log.Printf("Error adding point [%s]: %s\n", measurement, err.Error()) + log.Printf("E! Error adding point [%s]: %s\n", measurement, err.Error()) return nil } @@ -182,7 +182,7 @@ func (ac *accumulator) AddError(err error) { } atomic.AddUint64(&ac.errCount, 1) //TODO suppress/throttle consecutive duplicate errors? - log.Printf("ERROR in input [%s]: %s", ac.inputConfig.Name, err) + log.Printf("E! Error in input [%s]: %s", ac.inputConfig.Name, err) } func (ac *accumulator) Debug() bool { diff --git a/agent/agent.go b/agent/agent.go index d86037e79edeb..8fef8ca41383a 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -49,18 +49,16 @@ func (a *Agent) Connect() error { switch ot := o.Output.(type) { case telegraf.ServiceOutput: if err := ot.Start(); err != nil { - log.Printf("Service for output %s failed to start, exiting\n%s\n", + log.Printf("E! Service for output %s failed to start, exiting\n%s\n", o.Name, err.Error()) return err } } - if a.Config.Agent.Debug { - log.Printf("Attempting connection to output: %s\n", o.Name) - } + log.Printf("D! Attempting connection to output: %s\n", o.Name) err := o.Output.Connect() if err != nil { - log.Printf("Failed to connect to output %s, retrying in 15s, "+ + log.Printf("E! Failed to connect to output %s, retrying in 15s, "+ "error was '%s' \n", o.Name, err) time.Sleep(15 * time.Second) err = o.Output.Connect() @@ -68,9 +66,7 @@ func (a *Agent) Connect() error { return err } } - if a.Config.Agent.Debug { - log.Printf("Successfully connected to output: %s\n", o.Name) - } + log.Printf("D! Successfully connected to output: %s\n", o.Name) } return nil } @@ -92,9 +88,9 @@ func panicRecover(input *models.RunningInput) { if err := recover(); err != nil { trace := make([]byte, 2048) runtime.Stack(trace, true) - log.Printf("FATAL: Input [%s] panicked: %s, Stack:\n%s\n", + log.Printf("E! FATAL: Input [%s] panicked: %s, Stack:\n%s\n", input.Name, err, trace) - log.Println("PLEASE REPORT THIS PANIC ON GITHUB with " + + log.Println("E! PLEASE REPORT THIS PANIC ON GITHUB with " + "stack trace, configuration, and OS information: " + "https://github.com/influxdata/telegraf/issues/new") } @@ -117,7 +113,6 @@ func (a *Agent) gatherer( var outerr error acc := NewAccumulator(input.Config, metricC) - acc.SetDebug(a.Config.Agent.Debug) acc.SetPrecision(a.Config.Agent.Precision.Duration, a.Config.Agent.Interval.Duration) acc.setDefaultTags(a.Config.Tags) @@ -131,10 +126,8 @@ func (a *Agent) gatherer( if outerr != nil { return outerr } - if a.Config.Agent.Debug { - log.Printf("Input [%s] gathered metrics, (%s interval) in %s\n", - input.Name, interval, elapsed) - } + log.Printf("D! Input [%s] gathered metrics, (%s interval) in %s\n", + input.Name, interval, elapsed) select { case <-shutdown: @@ -167,11 +160,11 @@ func gatherWithTimeout( select { case err := <-done: if err != nil { - log.Printf("ERROR in input [%s]: %s", input.Name, err) + log.Printf("E! ERROR in input [%s]: %s", input.Name, err) } return case <-ticker.C: - log.Printf("ERROR: input [%s] took longer to collect than "+ + log.Printf("E! ERROR: input [%s] took longer to collect than "+ "collection interval (%s)", input.Name, timeout) continue @@ -244,7 +237,7 @@ func (a *Agent) flush() { defer wg.Done() err := output.Write() if err != nil { - log.Printf("Error writing to output [%s]: %s\n", + log.Printf("E! Error writing to output [%s]: %s\n", output.Name, err.Error()) } }(o) @@ -264,7 +257,7 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er for { select { case <-shutdown: - log.Println("Hang on, flushing any cached metrics before shutdown") + log.Println("I! Hang on, flushing any cached metrics before shutdown") a.flush() return nil case <-ticker.C: @@ -302,9 +295,9 @@ func copyMetric(m telegraf.Metric) telegraf.Metric { func (a *Agent) Run(shutdown chan struct{}) error { var wg sync.WaitGroup - log.Printf("Agent Config: Interval:%s, Debug:%#v, Quiet:%#v, Hostname:%#v, "+ + log.Printf("I! Agent Config: Interval:%s, Quiet:%#v, Hostname:%#v, "+ "Flush Interval:%s \n", - a.Config.Agent.Interval.Duration, a.Config.Agent.Debug, a.Config.Agent.Quiet, + a.Config.Agent.Interval.Duration, a.Config.Agent.Quiet, a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration) // channel shared between all input threads for accumulating metrics @@ -315,13 +308,12 @@ func (a *Agent) Run(shutdown chan struct{}) error { switch p := input.Input.(type) { case telegraf.ServiceInput: acc := NewAccumulator(input.Config, metricC) - acc.SetDebug(a.Config.Agent.Debug) // Service input plugins should set their own precision of their // metrics. acc.DisablePrecision() acc.setDefaultTags(a.Config.Tags) if err := p.Start(acc); err != nil { - log.Printf("Service for input %s failed to start, exiting\n%s\n", + log.Printf("E! Service for input %s failed to start, exiting\n%s\n", input.Name, err.Error()) return err } @@ -339,7 +331,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { go func() { defer wg.Done() if err := a.flusher(shutdown, metricC); err != nil { - log.Printf("Flusher routine failed, exiting: %s\n", err.Error()) + log.Printf("E! Flusher routine failed, exiting: %s\n", err.Error()) close(shutdown) } }() @@ -354,7 +346,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { go func(in *models.RunningInput, interv time.Duration) { defer wg.Done() if err := a.gatherer(shutdown, in, interv, metricC); err != nil { - log.Printf(err.Error()) + log.Printf("E! " + err.Error()) } }(input, interval) } diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 38ae1b00ada88..2200460d1c70e 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -12,15 +12,17 @@ import ( "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/internal/config" + "github.com/influxdata/telegraf/logger" "github.com/influxdata/telegraf/plugins/inputs" _ "github.com/influxdata/telegraf/plugins/inputs/all" "github.com/influxdata/telegraf/plugins/outputs" _ "github.com/influxdata/telegraf/plugins/outputs/all" + "github.com/kardianos/service" ) var fDebug = flag.Bool("debug", false, - "show metrics as they're generated to stdout") + "turn on debug logging") var fQuiet = flag.Bool("quiet", false, "run in quiet mode") var fTest = flag.Bool("test", false, "gather metrics, print them out, and exit") @@ -109,12 +111,11 @@ Examples: telegraf -config telegraf.conf -input-filter cpu:mem -output-filter influxdb ` -var logger service.Logger +// var logger service.Logger var stop chan struct{} var srvc service.Service -var svcConfig *service.Config type program struct{} @@ -212,13 +213,12 @@ func reloadLoop(stop chan struct{}, s service.Service) { log.Fatal(err) } - if *fDebug { - ag.Config.Agent.Debug = true - } - - if *fQuiet { - ag.Config.Agent.Quiet = true - } + // Setup logging + logger.SetupLogging( + ag.Config.Agent.Debug || *fDebug, + ag.Config.Agent.Quiet || *fQuiet, + ag.Config.Agent.Logfile, + ) if *fTest { err = ag.Test() @@ -243,7 +243,7 @@ func reloadLoop(stop chan struct{}, s service.Service) { close(shutdown) } if sig == syscall.SIGHUP { - log.Printf("Reloading Telegraf config\n") + log.Printf("I! Reloading Telegraf config\n") <-reload reload <- true close(shutdown) @@ -253,10 +253,10 @@ func reloadLoop(stop chan struct{}, s service.Service) { } }() - log.Printf("Starting Telegraf (version %s)\n", version) - log.Printf("Loaded outputs: %s", strings.Join(c.OutputNames(), " ")) - log.Printf("Loaded inputs: %s", strings.Join(c.InputNames(), " ")) - log.Printf("Tags enabled: %s", c.ListTags()) + log.Printf("I! Starting Telegraf (version %s)\n", version) + log.Printf("I! Loaded outputs: %s", strings.Join(c.OutputNames(), " ")) + log.Printf("I! Loaded inputs: %s", strings.Join(c.InputNames(), " ")) + log.Printf("I! Tags enabled: %s", c.ListTags()) if *fPidfile != "" { f, err := os.Create(*fPidfile) @@ -293,8 +293,9 @@ func (p *program) Stop(s service.Service) error { } func main() { + flag.Parse() if runtime.GOOS == "windows" { - svcConfig = &service.Config{ + svcConfig := &service.Config{ Name: "telegraf", DisplayName: "Telegraf Data Collector Service", Description: "Collects data using a series of plugins and publishes it to" + @@ -307,13 +308,8 @@ func main() { if err != nil { log.Fatal(err) } - logger, err = s.Logger(nil) - if err != nil { - log.Fatal(err) - } - // Handle the -service flag here to prevent any issues with tooling that may not have an interactive - // session, e.g. installing from Ansible - flag.Parse() + // Handle the -service flag here to prevent any issues with tooling that + // may not have an interactive session, e.g. installing from Ansible. if *fService != "" { if *fConfig != "" { (*svcConfig).Arguments = []string{"-config", *fConfig} @@ -325,7 +321,7 @@ func main() { } else { err = s.Run() if err != nil { - logger.Error(err) + log.Println("E! " + err.Error()) } } } else { diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 5652a7a8b2880..2ad0bcbae66d2 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -30,12 +30,15 @@ ## ie, if interval="10s" then always collect on :00, :10, :20, etc. round_interval = true - ## Telegraf will send metrics to outputs in batches of at - ## most metric_batch_size metrics. + ## Telegraf will send metrics to outputs in batches of at most + ## metric_batch_size metrics. + ## This controls the size of writes that Telegraf sends to output plugins. metric_batch_size = 1000 + ## For failed writes, telegraf will cache metric_buffer_limit metrics for each ## output, and will flush this buffer on a successful write. Oldest metrics ## are dropped first when this buffer fills. + ## This buffer only fills when writes fail to output plugin(s). metric_buffer_limit = 10000 ## Collection jitter is used to jitter the collection by a random amount. @@ -57,10 +60,15 @@ ## Precision will NOT be used for service inputs, such as logparser and statsd. ## Valid values are "ns", "us" (or "µs"), "ms", "s". precision = "" - ## Run telegraf in debug mode + + ## Logging configuration: + ## Run telegraf with debug log messages. debug = false - ## Run telegraf in quiet mode + ## Run telegraf in quiet mode (error log messages only). quiet = false + ## Specify the log file name. The empty string means to log to stdout. + logfile = "" + ## Override default hostname, if empty use os.Hostname() hostname = "" ## If set to true, do no set the "host" tag in the telegraf agent. @@ -1064,8 +1072,6 @@ # # "tasks", # # "messages", # # ] -# ## Include mesos tasks statistics, default is false -# # slave_tasks = true # # Read metrics from one or many MongoDB servers @@ -1442,25 +1448,29 @@ # # Retrieves SNMP values from remote agents # [[inputs.snmp]] # agents = [ "127.0.0.1:161" ] +# ## Timeout for each SNMP query. # timeout = "5s" +# ## Number of retries to attempt within timeout. +# retries = 3 +# ## SNMP version, values can be 1, 2, or 3 # version = 2 # -# # SNMPv1 & SNMPv2 parameters +# ## SNMP community string. # community = "public" # -# # SNMPv2 & SNMPv3 parameters -# max_repetitions = 50 +# ## The GETBULK max-repetitions parameter +# max_repetitions = 10 # -# # SNMPv3 parameters +# ## SNMPv3 auth parameters # #sec_name = "myuser" -# #auth_protocol = "md5" # Values: "MD5", "SHA", "" -# #auth_password = "password123" -# #sec_level = "authNoPriv" # Values: "noAuthNoPriv", "authNoPriv", "authPriv" +# #auth_protocol = "md5" # Values: "MD5", "SHA", "" +# #auth_password = "pass" +# #sec_level = "authNoPriv" # Values: "noAuthNoPriv", "authNoPriv", "authPriv" # #context_name = "" -# #priv_protocol = "" # Values: "DES", "AES", "" +# #priv_protocol = "" # Values: "DES", "AES", "" # #priv_password = "" # -# # measurement name +# ## measurement name # name = "system" # [[inputs.snmp.field]] # name = "hostname" @@ -1475,7 +1485,7 @@ # oid = "HOST-RESOURCES-MIB::hrMemorySize" # # [[inputs.snmp.table]] -# # measurement name +# ## measurement name # name = "remote_servers" # inherit_tags = [ "hostname" ] # [[inputs.snmp.table.field]] @@ -1490,7 +1500,7 @@ # oid = ".1.0.0.0.1.2" # # [[inputs.snmp.table]] -# # auto populate table's fields using the MIB +# ## auto populate table's fields using the MIB # oid = "HOST-RESOURCES-MIB::hrNetworkTable" diff --git a/etc/telegraf_windows.conf b/etc/telegraf_windows.conf index 9ce067c3925de..4825d715a6b7c 100644 --- a/etc/telegraf_windows.conf +++ b/etc/telegraf_windows.conf @@ -42,10 +42,14 @@ ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s flush_jitter = "0s" + ## Logging configuration: ## Run telegraf in debug mode debug = false ## Run telegraf in quiet mode quiet = false + ## Specify the log file name. The empty string means to log to stdout. + logfile = "/Program Files/Telegraf/telegraf.log" + ## Override default hostname, if empty use os.Hostname() hostname = "" @@ -85,7 +89,7 @@ # Windows Performance Counters plugin. # These are the recommended method of monitoring system metrics on windows, # as the regular system plugins (inputs.cpu, inputs.mem, etc.) rely on WMI, -# which utilizes a lot of system resources. +# which utilize more system resources. # # See more configuration examples at: # https://github.com/influxdata/telegraf/tree/master/plugins/inputs/win_perf_counters @@ -95,70 +99,104 @@ # Processor usage, alternative to native, reports on a per core. ObjectName = "Processor" Instances = ["*"] - Counters = ["% Idle Time", "% Interrupt Time", "% Privileged Time", "% User Time", "% Processor Time"] + Counters = [ + "% Idle Time", + "% Interrupt Time", + "% Privileged Time", + "% User Time", + "% Processor Time", + ] Measurement = "win_cpu" - #IncludeTotal=false #Set to true to include _Total instance when querying for all (*). + # Set to true to include _Total instance when querying for all (*). + #IncludeTotal=false [[inputs.win_perf_counters.object]] # Disk times and queues ObjectName = "LogicalDisk" Instances = ["*"] - Counters = ["% Idle Time", "% Disk Time","% Disk Read Time", "% Disk Write Time", "% User Time", "Current Disk Queue Length"] + Counters = [ + "% Idle Time", + "% Disk Time","% Disk Read Time", + "% Disk Write Time", + "% User Time", + "Current Disk Queue Length", + ] Measurement = "win_disk" - #IncludeTotal=false #Set to true to include _Total instance when querying for all (*). + # Set to true to include _Total instance when querying for all (*). + #IncludeTotal=false [[inputs.win_perf_counters.object]] ObjectName = "System" - Counters = ["Context Switches/sec","System Calls/sec"] + Counters = [ + "Context Switches/sec", + "System Calls/sec", + ] Instances = ["------"] Measurement = "win_system" - #IncludeTotal=false #Set to true to include _Total instance when querying for all (*). + # Set to true to include _Total instance when querying for all (*). + #IncludeTotal=false [[inputs.win_perf_counters.object]] - # Example query where the Instance portion must be removed to get data back, such as from the Memory object. + # Example query where the Instance portion must be removed to get data back, + # such as from the Memory object. ObjectName = "Memory" - Counters = ["Available Bytes","Cache Faults/sec","Demand Zero Faults/sec","Page Faults/sec","Pages/sec","Transition Faults/sec","Pool Nonpaged Bytes","Pool Paged Bytes"] - Instances = ["------"] # Use 6 x - to remove the Instance bit from the query. + Counters = [ + "Available Bytes", + "Cache Faults/sec", + "Demand Zero Faults/sec", + "Page Faults/sec", + "Pages/sec", + "Transition Faults/sec", + "Pool Nonpaged Bytes", + "Pool Paged Bytes", + ] + # Use 6 x - to remove the Instance bit from the query. + Instances = ["------"] Measurement = "win_mem" - #IncludeTotal=false #Set to true to include _Total instance when querying for all (*). + # Set to true to include _Total instance when querying for all (*). + #IncludeTotal=false # Windows system plugins using WMI (disabled by default, using # win_perf_counters over WMI is recommended) -# Read metrics about cpu usage -#[[inputs.cpu]] - ## Whether to report per-cpu stats or not - #percpu = true - ## Whether to report total system cpu stats or not - #totalcpu = true - ## Comment this line if you want the raw CPU time metrics - #fielddrop = ["time_*"] - -# Read metrics about disk usage by mount point -#[[inputs.disk]] - ## By default, telegraf gather stats for all mountpoints. - ## Setting mountpoints will restrict the stats to the specified mountpoints. - ## mount_points=["/"] - - ## Ignore some mountpoints by filesystem type. For example (dev)tmpfs (usually - ## present on /run, /var/run, /dev/shm or /dev). - #ignore_fs = ["tmpfs", "devtmpfs"] - -# Read metrics about disk IO by device -#[[inputs.diskio]] - ## By default, telegraf will gather stats for all devices including - ## disk partitions. - ## Setting devices will restrict the stats to the specified devices. - ## devices = ["sda", "sdb"] - ## Uncomment the following line if you do not need disk serial numbers. - ## skip_serial_number = true - -# Read metrics about memory usage -#[[inputs.mem]] - # no configuration - -# Read metrics about swap memory usage -#[[inputs.swap]] - # no configuration +# # Read metrics about cpu usage +# [[inputs.cpu]] +# ## Whether to report per-cpu stats or not +# percpu = true +# ## Whether to report total system cpu stats or not +# totalcpu = true +# ## Comment this line if you want the raw CPU time metrics +# fielddrop = ["time_*"] + + +# # Read metrics about disk usage by mount point +# [[inputs.disk]] +# ## By default, telegraf gather stats for all mountpoints. +# ## Setting mountpoints will restrict the stats to the specified mountpoints. +# ## mount_points=["/"] +# +# ## Ignore some mountpoints by filesystem type. For example (dev)tmpfs (usually +# ## present on /run, /var/run, /dev/shm or /dev). +# # ignore_fs = ["tmpfs", "devtmpfs"] + + +# # Read metrics about disk IO by device +# [[inputs.diskio]] +# ## By default, telegraf will gather stats for all devices including +# ## disk partitions. +# ## Setting devices will restrict the stats to the specified devices. +# ## devices = ["sda", "sdb"] +# ## Uncomment the following line if you do not need disk serial numbers. +# ## skip_serial_number = true + + +# # Read metrics about memory usage +# [[inputs.mem]] +# # no configuration + + +# # Read metrics about swap memory usage +# [[inputs.swap]] +# # no configuration diff --git a/internal/config/config.go b/internal/config/config.go index 077aa3076bba4..b76c9b5207547 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -125,6 +125,9 @@ type AgentConfig struct { // Debug is the option for running in debug mode Debug bool + // Logfile specifies the file to send logs to + Logfile string + // Quiet is the option for running in quiet mode Quiet bool Hostname string @@ -195,12 +198,15 @@ var header = `# Telegraf Configuration ## ie, if interval="10s" then always collect on :00, :10, :20, etc. round_interval = true - ## Telegraf will send metrics to outputs in batches of at - ## most metric_batch_size metrics. + ## Telegraf will send metrics to outputs in batches of at most + ## metric_batch_size metrics. + ## This controls the size of writes that Telegraf sends to output plugins. metric_batch_size = 1000 + ## For failed writes, telegraf will cache metric_buffer_limit metrics for each ## output, and will flush this buffer on a successful write. Oldest metrics ## are dropped first when this buffer fills. + ## This buffer only fills when writes fail to output plugin(s). metric_buffer_limit = 10000 ## Collection jitter is used to jitter the collection by a random amount. @@ -222,10 +228,15 @@ var header = `# Telegraf Configuration ## Precision will NOT be used for service inputs, such as logparser and statsd. ## Valid values are "ns", "us" (or "µs"), "ms", "s". precision = "" - ## Run telegraf in debug mode + + ## Logging configuration: + ## Run telegraf with debug log messages. debug = false - ## Run telegraf in quiet mode + ## Run telegraf in quiet mode (error log messages only). quiet = false + ## Specify the log file name. The empty string means to log to stdout. + logfile = "" + ## Override default hostname, if empty use os.Hostname() hostname = "" ## If set to true, do no set the "host" tag in the telegraf agent. @@ -435,7 +446,7 @@ func getDefaultConfigPath() (string, error) { } for _, path := range []string{envfile, homefile, etcfile} { if _, err := os.Stat(path); err == nil { - log.Printf("Using config file: %s", path) + log.Printf("I! Using config file: %s", path) return path, nil } } @@ -466,7 +477,7 @@ func (c *Config) LoadConfig(path string) error { return fmt.Errorf("%s: invalid configuration", path) } if err = config.UnmarshalTable(subTable, c.Tags); err != nil { - log.Printf("Could not parse [global_tags] config\n") + log.Printf("E! Could not parse [global_tags] config\n") return fmt.Errorf("Error parsing %s, %s", path, err) } } @@ -479,7 +490,7 @@ func (c *Config) LoadConfig(path string) error { return fmt.Errorf("%s: invalid configuration", path) } if err = config.UnmarshalTable(subTable, c.Agent); err != nil { - log.Printf("Could not parse [agent] config\n") + log.Printf("E! Could not parse [agent] config\n") return fmt.Errorf("Error parsing %s, %s", path, err) } } @@ -832,7 +843,7 @@ func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) { if node, ok := tbl.Fields["tags"]; ok { if subtbl, ok := node.(*ast.Table); ok { if err := config.UnmarshalTable(subtbl, cp.Tags); err != nil { - log.Printf("Could not parse tags for input %s\n", name) + log.Printf("E! Could not parse tags for input %s\n", name) } } } diff --git a/internal/internal.go b/internal/internal.go index 58a1200e06929..664a1d13b934b 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -198,7 +198,7 @@ func WaitTimeout(c *exec.Cmd, timeout time.Duration) error { return err case <-timer.C: if err := c.Process.Kill(); err != nil { - log.Printf("FATAL error killing process: %s", err) + log.Printf("E! FATAL error killing process: %s", err) return err } // wait for the command to return after killing it diff --git a/internal/models/running_output.go b/internal/models/running_output.go index c4de4afd95d7f..aa94178f74145 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -85,7 +85,7 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { // Write writes all cached points to this output. func (ro *RunningOutput) Write() error { if !ro.Quiet { - log.Printf("Output [%s] buffer fullness: %d / %d metrics. "+ + log.Printf("I! Output [%s] buffer fullness: %d / %d metrics. "+ "Total gathered metrics: %d. Total dropped metrics: %d.", ro.Name, ro.failMetrics.Len()+ro.metrics.Len(), @@ -142,7 +142,7 @@ func (ro *RunningOutput) write(metrics []telegraf.Metric) error { elapsed := time.Since(start) if err == nil { if !ro.Quiet { - log.Printf("Output [%s] wrote batch of %d metrics in %s\n", + log.Printf("I! Output [%s] wrote batch of %d metrics in %s\n", ro.Name, len(metrics), elapsed) } } diff --git a/logger/logger.go b/logger/logger.go new file mode 100644 index 0000000000000..fabaabf39fde8 --- /dev/null +++ b/logger/logger.go @@ -0,0 +1,58 @@ +package logger + +import ( + "io" + "log" + "os" + + "github.com/influxdata/wlog" +) + +// newTelegrafWriter returns a logging-wrapped writer. +func newTelegrafWriter(w io.Writer) io.Writer { + return &telegrafLog{ + writer: wlog.NewWriter(w), + } +} + +type telegrafLog struct { + writer io.Writer +} + +func (t *telegrafLog) Write(p []byte) (n int, err error) { + return t.writer.Write(p) +} + +// SetupLogging configures the logging output. +// debug will set the log level to DEBUG +// quiet will set the log level to ERROR +// logfile will direct the logging output to a file. Empty string is +// interpreted as stdout. If there is an error opening the file the +// logger will fallback to stdout. +func SetupLogging(debug, quiet bool, logfile string) { + if debug { + wlog.SetLevel(wlog.DEBUG) + } + if quiet { + wlog.SetLevel(wlog.ERROR) + } + + var oFile *os.File + if logfile != "" { + if _, err := os.Stat(logfile); os.IsNotExist(err) { + if oFile, err = os.Create(logfile); err != nil { + log.Printf("E! Unable to create %s (%s), using stdout", logfile, err) + oFile = os.Stdout + } + } else { + if oFile, err = os.OpenFile(logfile, os.O_APPEND|os.O_WRONLY, os.ModeAppend); err != nil { + log.Printf("E! Unable to append to %s (%s), using stdout", logfile, err) + oFile = os.Stdout + } + } + } else { + oFile = os.Stdout + } + + log.SetOutput(newTelegrafWriter(oFile)) +} diff --git a/plugins/inputs/aerospike/aerospike.go b/plugins/inputs/aerospike/aerospike.go index f7c90019553eb..c71312aded247 100644 --- a/plugins/inputs/aerospike/aerospike.go +++ b/plugins/inputs/aerospike/aerospike.go @@ -88,7 +88,7 @@ func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) erro if err == nil { fields[strings.Replace(k, "-", "_", -1)] = val } else { - log.Printf("skipping aerospike field %v with int64 overflow", k) + log.Printf("I! I! skipping aerospike field %v with int64 overflow", k) } } acc.AddFields("aerospike_node", fields, tags, time.Now()) @@ -121,7 +121,7 @@ func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) erro if err == nil { nFields[strings.Replace(parts[0], "-", "_", -1)] = val } else { - log.Printf("skipping aerospike field %v with int64 overflow", parts[0]) + log.Printf("I! skipping aerospike field %v with int64 overflow", parts[0]) } } acc.AddFields("aerospike_namespace", nFields, nTags, time.Now()) diff --git a/plugins/inputs/cassandra/cassandra.go b/plugins/inputs/cassandra/cassandra.go index e7edf7153980f..dc4bb2b720e1c 100644 --- a/plugins/inputs/cassandra/cassandra.go +++ b/plugins/inputs/cassandra/cassandra.go @@ -274,7 +274,7 @@ func (c *Cassandra) Gather(acc telegraf.Accumulator) error { m = newCassandraMetric(serverTokens["host"], metric, acc) } else { // unsupported metric type - log.Printf("Unsupported Cassandra metric [%s], skipping", + log.Printf("I! Unsupported Cassandra metric [%s], skipping", metric) continue } diff --git a/plugins/inputs/ceph/ceph.go b/plugins/inputs/ceph/ceph.go index d5ed464fa5a38..9f0a6ac786257 100644 --- a/plugins/inputs/ceph/ceph.go +++ b/plugins/inputs/ceph/ceph.go @@ -100,12 +100,12 @@ func (c *Ceph) gatherAdminSocketStats(acc telegraf.Accumulator) error { for _, s := range sockets { dump, err := perfDump(c.CephBinary, s) if err != nil { - log.Printf("error reading from socket '%s': %v", s.socket, err) + log.Printf("E! error reading from socket '%s': %v", s.socket, err) continue } data, err := parseDump(dump) if err != nil { - log.Printf("error parsing dump from socket '%s': %v", s.socket, err) + log.Printf("E! error parsing dump from socket '%s': %v", s.socket, err) continue } for tag, metrics := range *data { @@ -293,7 +293,7 @@ func flatten(data interface{}) []*metric { } } default: - log.Printf("Ignoring unexpected type '%T' for value %v", val, val) + log.Printf("I! Ignoring unexpected type '%T' for value %v", val, val) } return metrics diff --git a/plugins/inputs/conntrack/conntrack.go b/plugins/inputs/conntrack/conntrack.go index 68bf8adba52d5..841aedb545a03 100644 --- a/plugins/inputs/conntrack/conntrack.go +++ b/plugins/inputs/conntrack/conntrack.go @@ -93,13 +93,14 @@ func (c *Conntrack) Gather(acc telegraf.Accumulator) error { contents, err := ioutil.ReadFile(fName) if err != nil { - log.Printf("failed to read file '%s': %v", fName, err) + log.Printf("E! failed to read file '%s': %v", fName, err) + continue } v := strings.TrimSpace(string(contents)) fields[metricKey], err = strconv.ParseFloat(v, 64) if err != nil { - log.Printf("failed to parse metric, expected number but "+ + log.Printf("E! failed to parse metric, expected number but "+ " found '%s': %v", v, err) } } diff --git a/plugins/inputs/docker/docker.go b/plugins/inputs/docker/docker.go index 41f59d2d26334..e2c488dc85cff 100644 --- a/plugins/inputs/docker/docker.go +++ b/plugins/inputs/docker/docker.go @@ -126,7 +126,7 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error { defer wg.Done() err := d.gatherContainer(c, acc) if err != nil { - log.Printf("Error gathering container %s stats: %s\n", + log.Printf("E! Error gathering container %s stats: %s\n", c.Names, err.Error()) } }(container) diff --git a/plugins/inputs/http_listener/http_listener.go b/plugins/inputs/http_listener/http_listener.go index 233b363c0f44b..2eeee8e75657d 100644 --- a/plugins/inputs/http_listener/http_listener.go +++ b/plugins/inputs/http_listener/http_listener.go @@ -74,7 +74,7 @@ func (t *HttpListener) Start(acc telegraf.Accumulator) error { go t.httpListen() - log.Printf("Started HTTP listener service on %s\n", t.ServiceAddress) + log.Printf("I! Started HTTP listener service on %s\n", t.ServiceAddress) return nil } @@ -89,7 +89,7 @@ func (t *HttpListener) Stop() { t.wg.Wait() - log.Println("Stopped HTTP listener service on ", t.ServiceAddress) + log.Println("I! Stopped HTTP listener service on ", t.ServiceAddress) } // httpListen listens for HTTP requests. diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index bdfce17f981b1..52117759dd5ad 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -90,7 +90,7 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error { case "newest": config.Offsets.Initial = sarama.OffsetNewest default: - log.Printf("WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n", + log.Printf("I! WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n", k.Offset) config.Offsets.Initial = sarama.OffsetOldest } @@ -115,7 +115,7 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error { // Start the kafka message reader go k.receiver() - log.Printf("Started the kafka consumer service, peers: %v, topics: %v\n", + log.Printf("I! Started the kafka consumer service, peers: %v, topics: %v\n", k.ZookeeperPeers, k.Topics) return nil } @@ -129,12 +129,12 @@ func (k *Kafka) receiver() { return case err := <-k.errs: if err != nil { - log.Printf("Kafka Consumer Error: %s\n", err) + log.Printf("E! Kafka Consumer Error: %s\n", err) } case msg := <-k.in: metrics, err := k.parser.Parse(msg.Value) if err != nil { - log.Printf("KAFKA PARSE ERROR\nmessage: %s\nerror: %s", + log.Printf("E! Kafka Message Parse Error\nmessage: %s\nerror: %s", string(msg.Value), err.Error()) } @@ -158,7 +158,7 @@ func (k *Kafka) Stop() { defer k.Unlock() close(k.done) if err := k.Consumer.Close(); err != nil { - log.Printf("Error closing kafka consumer: %s\n", err.Error()) + log.Printf("E! Error closing kafka consumer: %s\n", err.Error()) } } diff --git a/plugins/inputs/logparser/grok/grok.go b/plugins/inputs/logparser/grok/grok.go index 70b75982622e4..b2cabe642fd2d 100644 --- a/plugins/inputs/logparser/grok/grok.go +++ b/plugins/inputs/logparser/grok/grok.go @@ -202,21 +202,21 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { case INT: iv, err := strconv.ParseInt(v, 10, 64) if err != nil { - log.Printf("ERROR parsing %s to int: %s", v, err) + log.Printf("E! Error parsing %s to int: %s", v, err) } else { fields[k] = iv } case FLOAT: fv, err := strconv.ParseFloat(v, 64) if err != nil { - log.Printf("ERROR parsing %s to float: %s", v, err) + log.Printf("E! Error parsing %s to float: %s", v, err) } else { fields[k] = fv } case DURATION: d, err := time.ParseDuration(v) if err != nil { - log.Printf("ERROR parsing %s to duration: %s", v, err) + log.Printf("E! Error parsing %s to duration: %s", v, err) } else { fields[k] = int64(d) } @@ -227,14 +227,14 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { case EPOCH: iv, err := strconv.ParseInt(v, 10, 64) if err != nil { - log.Printf("ERROR parsing %s to int: %s", v, err) + log.Printf("E! Error parsing %s to int: %s", v, err) } else { timestamp = time.Unix(iv, 0) } case EPOCH_NANO: iv, err := strconv.ParseInt(v, 10, 64) if err != nil { - log.Printf("ERROR parsing %s to int: %s", v, err) + log.Printf("E! Error parsing %s to int: %s", v, err) } else { timestamp = time.Unix(0, iv) } @@ -265,7 +265,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { // if we still haven't found a timestamp layout, log it and we will // just use time.Now() if !foundTs { - log.Printf("ERROR parsing timestamp [%s], could not find any "+ + log.Printf("E! Error parsing timestamp [%s], could not find any "+ "suitable time layouts.", v) } case DROP: @@ -275,7 +275,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { if err == nil { timestamp = ts } else { - log.Printf("ERROR parsing %s to time layout [%s]: %s", v, t, err) + log.Printf("E! Error parsing %s to time layout [%s]: %s", v, t, err) } } } diff --git a/plugins/inputs/logparser/logparser.go b/plugins/inputs/logparser/logparser.go index 8ded03edc3ebd..0778a8a6d7ab2 100644 --- a/plugins/inputs/logparser/logparser.go +++ b/plugins/inputs/logparser/logparser.go @@ -134,7 +134,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { for _, filepath := range l.Files { g, err := globpath.Compile(filepath) if err != nil { - log.Printf("ERROR Glob %s failed to compile, %s", filepath, err) + log.Printf("E! Error Glob %s failed to compile, %s", filepath, err) continue } files := g.Match() @@ -167,7 +167,7 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) { var line *tail.Line for line = range tailer.Lines { if line.Err != nil { - log.Printf("ERROR tailing file %s, Error: %s\n", + log.Printf("E! Error tailing file %s, Error: %s\n", tailer.Filename, line.Err) continue } @@ -216,7 +216,7 @@ func (l *LogParserPlugin) Stop() { for _, t := range l.tailers { err := t.Stop() if err != nil { - log.Printf("ERROR stopping tail on file %s\n", t.Filename) + log.Printf("E! Error stopping tail on file %s\n", t.Filename) } t.Cleanup() } diff --git a/plugins/inputs/mailchimp/chimp_api.go b/plugins/inputs/mailchimp/chimp_api.go index 75c9a30d79c6b..db0004ce2264f 100644 --- a/plugins/inputs/mailchimp/chimp_api.go +++ b/plugins/inputs/mailchimp/chimp_api.go @@ -134,7 +134,7 @@ func runChimp(api *ChimpAPI, params ReportsParams) ([]byte, error) { req.URL.RawQuery = params.String() req.Header.Set("User-Agent", "Telegraf-MailChimp-Plugin") if api.Debug { - log.Printf("Request URL: %s", req.URL.String()) + log.Printf("D! Request URL: %s", req.URL.String()) } resp, err := client.Do(req) @@ -148,7 +148,7 @@ func runChimp(api *ChimpAPI, params ReportsParams) ([]byte, error) { return nil, err } if api.Debug { - log.Printf("Response Body:%s", string(body)) + log.Printf("D! Response Body:%s", string(body)) } if err = chimpErrorCheck(body); err != nil { diff --git a/plugins/inputs/mesos/mesos.go b/plugins/inputs/mesos/mesos.go index 31ce805f4af78..e6c68bd7dd321 100644 --- a/plugins/inputs/mesos/mesos.go +++ b/plugins/inputs/mesos/mesos.go @@ -88,7 +88,7 @@ func (m *Mesos) SetDefaults() { } if m.Timeout == 0 { - log.Println("[mesos] Missing timeout value, setting default value (100ms)") + log.Println("I! [mesos] Missing timeout value, setting default value (100ms)") m.Timeout = 100 } } @@ -383,7 +383,7 @@ func getMetrics(role Role, group string) []string { ret, ok := m[group] if !ok { - log.Printf("[mesos] Unkown %s metrics group: %s\n", role, group) + log.Printf("I! [mesos] Unkown %s metrics group: %s\n", role, group) return []string{} } diff --git a/plugins/inputs/mongodb/mongodb_server.go b/plugins/inputs/mongodb/mongodb_server.go index 863e925225c86..e843c70f0fb7c 100644 --- a/plugins/inputs/mongodb/mongodb_server.go +++ b/plugins/inputs/mongodb/mongodb_server.go @@ -47,7 +47,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool) error }, }, result_repl) if err != nil { - log.Println("Not gathering replica set status, member not in replica set (" + err.Error() + ")") + log.Println("E! Not gathering replica set status, member not in replica set (" + err.Error() + ")") } jumbo_chunks, _ := s.Session.DB("config").C("chunks").Find(bson.M{"jumbo": true}).Count() @@ -62,7 +62,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool) error names := []string{} names, err = s.Session.DatabaseNames() if err != nil { - log.Println("Error getting database names (" + err.Error() + ")") + log.Println("E! Error getting database names (" + err.Error() + ")") } for _, db_name := range names { db_stat_line := &DbStatsData{} @@ -73,7 +73,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool) error }, }, db_stat_line) if err != nil { - log.Println("Error getting db stats from " + db_name + "(" + err.Error() + ")") + log.Println("E! Error getting db stats from " + db_name + "(" + err.Error() + ")") } db := &Db{ Name: db_name, diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index beebe00ce3ba5..cfade2944d7e1 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -133,7 +133,7 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { return nil } func (m *MQTTConsumer) onConnect(c mqtt.Client) { - log.Printf("MQTT Client Connected") + log.Printf("I! MQTT Client Connected") if !m.PersistentSession || !m.started { topics := make(map[string]byte) for _, topic := range m.Topics { @@ -142,7 +142,7 @@ func (m *MQTTConsumer) onConnect(c mqtt.Client) { subscribeToken := c.SubscribeMultiple(topics, m.recvMessage) subscribeToken.Wait() if subscribeToken.Error() != nil { - log.Printf("MQTT SUBSCRIBE ERROR\ntopics: %s\nerror: %s", + log.Printf("E! MQTT Subscribe Error\ntopics: %s\nerror: %s", strings.Join(m.Topics[:], ","), subscribeToken.Error()) } m.started = true @@ -151,7 +151,7 @@ func (m *MQTTConsumer) onConnect(c mqtt.Client) { } func (m *MQTTConsumer) onConnectionLost(c mqtt.Client, err error) { - log.Printf("MQTT Connection lost\nerror: %s\nMQTT Client will try to reconnect", err.Error()) + log.Printf("E! MQTT Connection lost\nerror: %s\nMQTT Client will try to reconnect", err.Error()) return } @@ -166,7 +166,7 @@ func (m *MQTTConsumer) receiver() { topic := msg.Topic() metrics, err := m.parser.Parse(msg.Payload()) if err != nil { - log.Printf("MQTT PARSE ERROR\nmessage: %s\nerror: %s", + log.Printf("E! MQTT Parse Error\nmessage: %s\nerror: %s", string(msg.Payload()), err.Error()) } diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 232d5740f61c5..a760d0362c52a 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -119,7 +119,7 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error { // Start the message reader go n.receiver() - log.Printf("Started the NATS consumer service, nats: %v, subjects: %v, queue: %v\n", + log.Printf("I! Started the NATS consumer service, nats: %v, subjects: %v, queue: %v\n", n.Conn.ConnectedUrl(), n.Subjects, n.QueueGroup) return nil @@ -134,11 +134,11 @@ func (n *natsConsumer) receiver() { case <-n.done: return case err := <-n.errs: - log.Printf("error reading from %s\n", err.Error()) + log.Printf("E! error reading from %s\n", err.Error()) case msg := <-n.in: metrics, err := n.parser.Parse(msg.Data) if err != nil { - log.Printf("subject: %s, error: %s", msg.Subject, err.Error()) + log.Printf("E! subject: %s, error: %s", msg.Subject, err.Error()) } for _, metric := range metrics { @@ -157,7 +157,7 @@ func (n *natsConsumer) clean() { for _, sub := range n.Subs { if err := sub.Unsubscribe(); err != nil { - log.Printf("Error unsubscribing from subject %s in queue %s: %s\n", + log.Printf("E! Error unsubscribing from subject %s in queue %s: %s\n", sub.Subject, sub.Queue, err.Error()) } } diff --git a/plugins/inputs/nsq_consumer/nsq_consumer.go b/plugins/inputs/nsq_consumer/nsq_consumer.go index b227b7e5029d7..d4f4e9679d884 100644 --- a/plugins/inputs/nsq_consumer/nsq_consumer.go +++ b/plugins/inputs/nsq_consumer/nsq_consumer.go @@ -62,7 +62,7 @@ func (n *NSQConsumer) Start(acc telegraf.Accumulator) error { n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error { metrics, err := n.parser.Parse(message.Body) if err != nil { - log.Printf("NSQConsumer Parse Error\nmessage:%s\nerror:%s", string(message.Body), err.Error()) + log.Printf("E! NSQConsumer Parse Error\nmessage:%s\nerror:%s", string(message.Body), err.Error()) return nil } for _, metric := range metrics { diff --git a/plugins/inputs/ntpq/ntpq.go b/plugins/inputs/ntpq/ntpq.go index 0bcaa04e5ac7d..674cd7216d629 100644 --- a/plugins/inputs/ntpq/ntpq.go +++ b/plugins/inputs/ntpq/ntpq.go @@ -132,7 +132,7 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error { case strings.HasSuffix(when, "h"): m, err := strconv.Atoi(strings.TrimSuffix(fields[index], "h")) if err != nil { - log.Printf("ERROR ntpq: parsing int: %s", fields[index]) + log.Printf("E! Error ntpq: parsing int: %s", fields[index]) continue } // seconds in an hour @@ -141,7 +141,7 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error { case strings.HasSuffix(when, "d"): m, err := strconv.Atoi(strings.TrimSuffix(fields[index], "d")) if err != nil { - log.Printf("ERROR ntpq: parsing int: %s", fields[index]) + log.Printf("E! Error ntpq: parsing int: %s", fields[index]) continue } // seconds in a day @@ -150,7 +150,7 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error { case strings.HasSuffix(when, "m"): m, err := strconv.Atoi(strings.TrimSuffix(fields[index], "m")) if err != nil { - log.Printf("ERROR ntpq: parsing int: %s", fields[index]) + log.Printf("E! Error ntpq: parsing int: %s", fields[index]) continue } // seconds in a day @@ -161,7 +161,7 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error { m, err := strconv.Atoi(fields[index]) if err != nil { - log.Printf("ERROR ntpq: parsing int: %s", fields[index]) + log.Printf("E! Error ntpq: parsing int: %s", fields[index]) continue } mFields[key] = int64(m) @@ -178,7 +178,7 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error { m, err := strconv.ParseFloat(fields[index], 64) if err != nil { - log.Printf("ERROR ntpq: parsing float: %s", fields[index]) + log.Printf("E! Error ntpq: parsing float: %s", fields[index]) continue } mFields[key] = m diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible.go b/plugins/inputs/postgresql_extensible/postgresql_extensible.go index ec281fca26eef..199262c0bc745 100644 --- a/plugins/inputs/postgresql_extensible/postgresql_extensible.go +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible.go @@ -269,9 +269,7 @@ func (p *Postgresql) accRow(meas_name string, row scanner, acc telegraf.Accumula fields := make(map[string]interface{}) COLUMN: for col, val := range columnMap { - if acc.Debug() { - log.Printf("postgresql_extensible: column: %s = %T: %s\n", col, *val, *val) - } + log.Printf("D! postgresql_extensible: column: %s = %T: %s\n", col, *val, *val) _, ignore := ignoredColumns[col] if ignore || *val == nil { continue diff --git a/plugins/inputs/powerdns/powerdns.go b/plugins/inputs/powerdns/powerdns.go index 75d212a9a8c64..68b1696e0611f 100644 --- a/plugins/inputs/powerdns/powerdns.go +++ b/plugins/inputs/powerdns/powerdns.go @@ -110,7 +110,7 @@ func parseResponse(metrics string) map[string]interface{} { i, err := strconv.ParseInt(m[1], 10, 64) if err != nil { - log.Printf("powerdns: Error parsing integer for metric [%s]: %s", + log.Printf("E! powerdns: Error parsing integer for metric [%s]: %s", metric, err) continue } diff --git a/plugins/inputs/procstat/procstat.go b/plugins/inputs/procstat/procstat.go index 358dc4c0f3db6..e29b5031cbc62 100644 --- a/plugins/inputs/procstat/procstat.go +++ b/plugins/inputs/procstat/procstat.go @@ -66,7 +66,7 @@ func (_ *Procstat) Description() string { func (p *Procstat) Gather(acc telegraf.Accumulator) error { err := p.createProcesses() if err != nil { - log.Printf("Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] user: [%s] %s", + log.Printf("E! Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] user: [%s] %s", p.Exe, p.PidFile, p.Pattern, p.User, err.Error()) } else { for pid, proc := range p.pidmap { diff --git a/plugins/inputs/snmp_legacy/snmp_legacy.go b/plugins/inputs/snmp_legacy/snmp_legacy.go index b8b9a12324d58..e5dbbc459dbc4 100644 --- a/plugins/inputs/snmp_legacy/snmp_legacy.go +++ b/plugins/inputs/snmp_legacy/snmp_legacy.go @@ -296,7 +296,7 @@ func (s *Snmp) Gather(acc telegraf.Accumulator) error { data, err := ioutil.ReadFile(s.SnmptranslateFile) if err != nil { - log.Printf("Reading SNMPtranslate file error: %s", err) + log.Printf("E! Reading SNMPtranslate file error: %s", err) return err } else { for _, line := range strings.Split(string(data), "\n") { @@ -394,16 +394,16 @@ func (s *Snmp) Gather(acc telegraf.Accumulator) error { // only if len(s.OidInstanceMapping) == 0 if len(host.OidInstanceMapping) >= 0 { if err := host.SNMPMap(acc, s.nameToOid, s.subTableMap); err != nil { - log.Printf("SNMP Mapping error for host '%s': %s", host.Address, err) + log.Printf("E! SNMP Mapping error for host '%s': %s", host.Address, err) continue } } // Launch Get requests if err := host.SNMPGet(acc, s.initNode); err != nil { - log.Printf("SNMP Error for host '%s': %s", host.Address, err) + log.Printf("E! SNMP Error for host '%s': %s", host.Address, err) } if err := host.SNMPBulk(acc, s.initNode); err != nil { - log.Printf("SNMP Error for host '%s': %s", host.Address, err) + log.Printf("E! SNMP Error for host '%s': %s", host.Address, err) } } return nil @@ -800,7 +800,7 @@ func (h *Host) HandleResponse( acc.AddFields(field_name, fields, tags) case gosnmp.NoSuchObject, gosnmp.NoSuchInstance: // Oid not found - log.Printf("[snmp input] Oid not found: %s", oid_key) + log.Printf("E! [snmp input] Oid not found: %s", oid_key) default: // delete other data } diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index eb02a1df220db..a46af0a871ef2 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -28,7 +28,7 @@ const ( defaultAllowPendingMessage = 10000 ) -var dropwarn = "ERROR: statsd message queue full. " + +var dropwarn = "E! Error: statsd message queue full. " + "We have dropped %d messages so far. " + "You may want to increase allowed_pending_messages in the config\n" @@ -251,7 +251,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error { } if s.ConvertNames { - log.Printf("WARNING statsd: convert_names config option is deprecated," + + log.Printf("I! WARNING statsd: convert_names config option is deprecated," + " please use metric_separator instead") } @@ -264,7 +264,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error { go s.udpListen() // Start the line parser go s.parser() - log.Printf("Started the statsd service on %s\n", s.ServiceAddress) + log.Printf("I! Started the statsd service on %s\n", s.ServiceAddress) prevInstance = s return nil } @@ -278,7 +278,7 @@ func (s *Statsd) udpListen() error { if err != nil { log.Fatalf("ERROR: ListenUDP - %s", err) } - log.Println("Statsd listener listening on: ", s.listener.LocalAddr().String()) + log.Println("I! Statsd listener listening on: ", s.listener.LocalAddr().String()) buf := make([]byte, UDP_MAX_PACKET_SIZE) for { @@ -288,7 +288,7 @@ func (s *Statsd) udpListen() error { default: n, _, err := s.listener.ReadFromUDP(buf) if err != nil && !strings.Contains(err.Error(), "closed network") { - log.Printf("ERROR READ: %s\n", err.Error()) + log.Printf("E! Error READ: %s\n", err.Error()) continue } bufCopy := make([]byte, n) @@ -374,7 +374,7 @@ func (s *Statsd) parseStatsdLine(line string) error { // Validate splitting the line on ":" bits := strings.Split(line, ":") if len(bits) < 2 { - log.Printf("Error: splitting ':', Unable to parse metric: %s\n", line) + log.Printf("E! Error: splitting ':', Unable to parse metric: %s\n", line) return errors.New("Error Parsing statsd line") } @@ -390,11 +390,11 @@ func (s *Statsd) parseStatsdLine(line string) error { // Validate splitting the bit on "|" pipesplit := strings.Split(bit, "|") if len(pipesplit) < 2 { - log.Printf("Error: splitting '|', Unable to parse metric: %s\n", line) + log.Printf("E! Error: splitting '|', Unable to parse metric: %s\n", line) return errors.New("Error Parsing statsd line") } else if len(pipesplit) > 2 { sr := pipesplit[2] - errmsg := "Error: parsing sample rate, %s, it must be in format like: " + + errmsg := "E! Error: parsing sample rate, %s, it must be in format like: " + "@0.1, @0.5, etc. Ignoring sample rate for line: %s\n" if strings.Contains(sr, "@") && len(sr) > 1 { samplerate, err := strconv.ParseFloat(sr[1:], 64) @@ -414,14 +414,14 @@ func (s *Statsd) parseStatsdLine(line string) error { case "g", "c", "s", "ms", "h": m.mtype = pipesplit[1] default: - log.Printf("Error: Statsd Metric type %s unsupported", pipesplit[1]) + log.Printf("E! Error: Statsd Metric type %s unsupported", pipesplit[1]) return errors.New("Error Parsing statsd line") } // Parse the value if strings.HasPrefix(pipesplit[0], "-") || strings.HasPrefix(pipesplit[0], "+") { if m.mtype != "g" { - log.Printf("Error: +- values are only supported for gauges: %s\n", line) + log.Printf("E! Error: +- values are only supported for gauges: %s\n", line) return errors.New("Error Parsing statsd line") } m.additive = true @@ -431,7 +431,7 @@ func (s *Statsd) parseStatsdLine(line string) error { case "g", "ms", "h": v, err := strconv.ParseFloat(pipesplit[0], 64) if err != nil { - log.Printf("Error: parsing value to float64: %s\n", line) + log.Printf("E! Error: parsing value to float64: %s\n", line) return errors.New("Error Parsing statsd line") } m.floatvalue = v @@ -441,7 +441,7 @@ func (s *Statsd) parseStatsdLine(line string) error { if err != nil { v2, err2 := strconv.ParseFloat(pipesplit[0], 64) if err2 != nil { - log.Printf("Error: parsing value to int64: %s\n", line) + log.Printf("E! Error: parsing value to int64: %s\n", line) return errors.New("Error Parsing statsd line") } v = int64(v2) @@ -641,7 +641,7 @@ func (s *Statsd) aggregate(m metric) { func (s *Statsd) Stop() { s.Lock() defer s.Unlock() - log.Println("Stopping the statsd service") + log.Println("I! Stopping the statsd service") close(s.done) s.listener.Close() s.wg.Wait() diff --git a/plugins/inputs/sysstat/sysstat.go b/plugins/inputs/sysstat/sysstat.go index fe7ee6b38cf3e..9c9ef6b05f347 100644 --- a/plugins/inputs/sysstat/sysstat.go +++ b/plugins/inputs/sysstat/sysstat.go @@ -203,7 +203,7 @@ func (s *Sysstat) collect() error { out, err := internal.CombinedOutputTimeout(cmd, time.Second*time.Duration(collectInterval+parseInterval)) if err != nil { if err := os.Remove(s.tmpFile); err != nil { - log.Printf("failed to remove tmp file after %s command: %s", strings.Join(cmd.Args, " "), err) + log.Printf("E! failed to remove tmp file after %s command: %s", strings.Join(cmd.Args, " "), err) } return fmt.Errorf("failed to run command %s: %s - %s", strings.Join(cmd.Args, " "), err, string(out)) } diff --git a/plugins/inputs/system/processes.go b/plugins/inputs/system/processes.go index 1f77ae57d38dd..0950323fde8db 100644 --- a/plugins/inputs/system/processes.go +++ b/plugins/inputs/system/processes.go @@ -118,7 +118,7 @@ func (p *Processes) gatherFromPS(fields map[string]interface{}) error { case '?': fields["unknown"] = fields["unknown"].(int64) + int64(1) default: - log.Printf("processes: Unknown state [ %s ] from ps", + log.Printf("I! processes: Unknown state [ %s ] from ps", string(status[0])) } fields["total"] = fields["total"].(int64) + int64(1) @@ -169,14 +169,14 @@ func (p *Processes) gatherFromProc(fields map[string]interface{}) error { case 'W': fields["paging"] = fields["paging"].(int64) + int64(1) default: - log.Printf("processes: Unknown state [ %s ] in file %s", + log.Printf("I! processes: Unknown state [ %s ] in file %s", string(stats[0][0]), filename) } fields["total"] = fields["total"].(int64) + int64(1) threads, err := strconv.Atoi(string(stats[17])) if err != nil { - log.Printf("processes: Error parsing thread count: %s", err) + log.Printf("I! processes: Error parsing thread count: %s", err) continue } fields["total_threads"] = fields["total_threads"].(int64) + int64(threads) diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go index 942fd6bae7df0..e1bc32e51f0c6 100644 --- a/plugins/inputs/tail/tail.go +++ b/plugins/inputs/tail/tail.go @@ -81,7 +81,7 @@ func (t *Tail) Start(acc telegraf.Accumulator) error { for _, filepath := range t.Files { g, err := globpath.Compile(filepath) if err != nil { - log.Printf("ERROR Glob %s failed to compile, %s", filepath, err) + log.Printf("E! Error Glob %s failed to compile, %s", filepath, err) } for file, _ := range g.Match() { tailer, err := tail.TailFile(file, @@ -118,7 +118,7 @@ func (t *Tail) receiver(tailer *tail.Tail) { var line *tail.Line for line = range tailer.Lines { if line.Err != nil { - log.Printf("ERROR tailing file %s, Error: %s\n", + log.Printf("E! Error tailing file %s, Error: %s\n", tailer.Filename, err) continue } @@ -126,7 +126,7 @@ func (t *Tail) receiver(tailer *tail.Tail) { if err == nil { t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) } else { - log.Printf("Malformed log line in %s: [%s], Error: %s\n", + log.Printf("E! Malformed log line in %s: [%s], Error: %s\n", tailer.Filename, line.Text, err) } } @@ -139,7 +139,7 @@ func (t *Tail) Stop() { for _, t := range t.tailers { err := t.Stop() if err != nil { - log.Printf("ERROR stopping tail on file %s\n", t.Filename) + log.Printf("E! Error stopping tail on file %s\n", t.Filename) } t.Cleanup() } diff --git a/plugins/inputs/tcp_listener/tcp_listener.go b/plugins/inputs/tcp_listener/tcp_listener.go index b8bea2bd6c265..a8827c037fb29 100644 --- a/plugins/inputs/tcp_listener/tcp_listener.go +++ b/plugins/inputs/tcp_listener/tcp_listener.go @@ -43,11 +43,11 @@ type TcpListener struct { acc telegraf.Accumulator } -var dropwarn = "ERROR: tcp_listener message queue full. " + +var dropwarn = "E! Error: tcp_listener message queue full. " + "We have dropped %d messages so far. " + "You may want to increase allowed_pending_messages in the config\n" -var malformedwarn = "WARNING: tcp_listener has received %d malformed packets" + +var malformedwarn = "E! tcp_listener has received %d malformed packets" + " thus far." const sampleConfig = ` @@ -108,13 +108,13 @@ func (t *TcpListener) Start(acc telegraf.Accumulator) error { log.Fatalf("ERROR: ListenUDP - %s", err) return err } - log.Println("TCP server listening on: ", t.listener.Addr().String()) + log.Println("I! TCP server listening on: ", t.listener.Addr().String()) t.wg.Add(2) go t.tcpListen() go t.tcpParser() - log.Printf("Started TCP listener service on %s\n", t.ServiceAddress) + log.Printf("I! Started TCP listener service on %s\n", t.ServiceAddress) return nil } @@ -141,7 +141,7 @@ func (t *TcpListener) Stop() { t.wg.Wait() close(t.in) - log.Println("Stopped TCP listener service on ", t.ServiceAddress) + log.Println("I! Stopped TCP listener service on ", t.ServiceAddress) } // tcpListen listens for incoming TCP connections. @@ -182,8 +182,8 @@ func (t *TcpListener) refuser(conn *net.TCPConn) { " reached, closing.\nYou may want to increase max_tcp_connections in"+ " the Telegraf tcp listener configuration.\n", t.MaxTCPConnections) conn.Close() - log.Printf("Refused TCP Connection from %s", conn.RemoteAddr()) - log.Printf("WARNING: Maximum TCP Connections reached, you may want to" + + log.Printf("I! Refused TCP Connection from %s", conn.RemoteAddr()) + log.Printf("I! WARNING: Maximum TCP Connections reached, you may want to" + " adjust max_tcp_connections") } diff --git a/plugins/inputs/udp_listener/udp_listener.go b/plugins/inputs/udp_listener/udp_listener.go index fa773f624106c..d2c4d0bbc5eff 100644 --- a/plugins/inputs/udp_listener/udp_listener.go +++ b/plugins/inputs/udp_listener/udp_listener.go @@ -42,11 +42,11 @@ type UdpListener struct { // https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure const UDP_MAX_PACKET_SIZE int = 64 * 1024 -var dropwarn = "ERROR: udp_listener message queue full. " + +var dropwarn = "E! Error: udp_listener message queue full. " + "We have dropped %d messages so far. " + "You may want to increase allowed_pending_messages in the config\n" -var malformedwarn = "WARNING: udp_listener has received %d malformed packets" + +var malformedwarn = "E! udp_listener has received %d malformed packets" + " thus far." const sampleConfig = ` @@ -94,7 +94,7 @@ func (u *UdpListener) Start(acc telegraf.Accumulator) error { go u.udpListen() go u.udpParser() - log.Printf("Started UDP listener service on %s\n", u.ServiceAddress) + log.Printf("I! Started UDP listener service on %s\n", u.ServiceAddress) return nil } @@ -105,7 +105,7 @@ func (u *UdpListener) Stop() { u.wg.Wait() u.listener.Close() close(u.in) - log.Println("Stopped UDP listener service on ", u.ServiceAddress) + log.Println("I! Stopped UDP listener service on ", u.ServiceAddress) } func (u *UdpListener) udpListen() error { @@ -116,7 +116,7 @@ func (u *UdpListener) udpListen() error { if err != nil { log.Fatalf("ERROR: ListenUDP - %s", err) } - log.Println("UDP server listening on: ", u.listener.LocalAddr().String()) + log.Println("I! UDP server listening on: ", u.listener.LocalAddr().String()) buf := make([]byte, UDP_MAX_PACKET_SIZE) for { @@ -129,7 +129,7 @@ func (u *UdpListener) udpListen() error { if err != nil { if err, ok := err.(net.Error); ok && err.Timeout() { } else { - log.Printf("ERROR: %s\n", err.Error()) + log.Printf("E! Error: %s\n", err.Error()) } continue } diff --git a/plugins/inputs/webhooks/filestack/filestack_webhooks.go b/plugins/inputs/webhooks/filestack/filestack_webhooks.go index 623737670c698..19f8c0251bbb7 100644 --- a/plugins/inputs/webhooks/filestack/filestack_webhooks.go +++ b/plugins/inputs/webhooks/filestack/filestack_webhooks.go @@ -19,7 +19,7 @@ type FilestackWebhook struct { func (fs *FilestackWebhook) Register(router *mux.Router, acc telegraf.Accumulator) { router.HandleFunc(fs.Path, fs.eventHandler).Methods("POST") - log.Printf("Started the webhooks_filestack on %s\n", fs.Path) + log.Printf("I! Started the webhooks_filestack on %s\n", fs.Path) fs.acc = acc } diff --git a/plugins/inputs/webhooks/github/github_webhooks.go b/plugins/inputs/webhooks/github/github_webhooks.go index 5327363f4551c..139c7697104de 100644 --- a/plugins/inputs/webhooks/github/github_webhooks.go +++ b/plugins/inputs/webhooks/github/github_webhooks.go @@ -17,7 +17,7 @@ type GithubWebhook struct { func (gh *GithubWebhook) Register(router *mux.Router, acc telegraf.Accumulator) { router.HandleFunc(gh.Path, gh.eventHandler).Methods("POST") - log.Printf("Started the webhooks_github on %s\n", gh.Path) + log.Printf("I! Started the webhooks_github on %s\n", gh.Path) gh.acc = acc } @@ -58,7 +58,7 @@ func (e *newEventError) Error() string { } func NewEvent(data []byte, name string) (Event, error) { - log.Printf("New %v event received", name) + log.Printf("D! New %v event received", name) switch name { case "commit_comment": return generateEvent(data, &CommitCommentEvent{}) diff --git a/plugins/inputs/webhooks/mandrill/mandrill_webhooks.go b/plugins/inputs/webhooks/mandrill/mandrill_webhooks.go index e9d4a6de49144..4a14c88947f97 100644 --- a/plugins/inputs/webhooks/mandrill/mandrill_webhooks.go +++ b/plugins/inputs/webhooks/mandrill/mandrill_webhooks.go @@ -21,7 +21,7 @@ func (md *MandrillWebhook) Register(router *mux.Router, acc telegraf.Accumulator router.HandleFunc(md.Path, md.returnOK).Methods("HEAD") router.HandleFunc(md.Path, md.eventHandler).Methods("POST") - log.Printf("Started the webhooks_mandrill on %s\n", md.Path) + log.Printf("I! Started the webhooks_mandrill on %s\n", md.Path) md.acc = acc } diff --git a/plugins/inputs/webhooks/rollbar/rollbar_webhooks.go b/plugins/inputs/webhooks/rollbar/rollbar_webhooks.go index 8b8dada50afdc..6b6f0965c611e 100644 --- a/plugins/inputs/webhooks/rollbar/rollbar_webhooks.go +++ b/plugins/inputs/webhooks/rollbar/rollbar_webhooks.go @@ -19,7 +19,7 @@ type RollbarWebhook struct { func (rb *RollbarWebhook) Register(router *mux.Router, acc telegraf.Accumulator) { router.HandleFunc(rb.Path, rb.eventHandler).Methods("POST") - log.Printf("Started the webhooks_rollbar on %s\n", rb.Path) + log.Printf("I! Started the webhooks_rollbar on %s\n", rb.Path) rb.acc = acc } diff --git a/plugins/inputs/webhooks/webhooks.go b/plugins/inputs/webhooks/webhooks.go index 592656a141140..fcddbebd7f6e3 100644 --- a/plugins/inputs/webhooks/webhooks.go +++ b/plugins/inputs/webhooks/webhooks.go @@ -73,7 +73,7 @@ func (wb *Webhooks) Listen(acc telegraf.Accumulator) { err := http.ListenAndServe(fmt.Sprintf("%s", wb.ServiceAddress), r) if err != nil { - log.Printf("Error starting server: %v", err) + log.Printf("E! Error starting server: %v", err) } } @@ -100,10 +100,10 @@ func (wb *Webhooks) AvailableWebhooks() []Webhook { func (wb *Webhooks) Start(acc telegraf.Accumulator) error { go wb.Listen(acc) - log.Printf("Started the webhooks service on %s\n", wb.ServiceAddress) + log.Printf("I! Started the webhooks service on %s\n", wb.ServiceAddress) return nil } func (rb *Webhooks) Stop() { - log.Println("Stopping the Webhooks service") + log.Println("I! Stopping the Webhooks service") } diff --git a/plugins/outputs/amon/amon.go b/plugins/outputs/amon/amon.go index f88c2ddc594aa..a113f2616c042 100644 --- a/plugins/outputs/amon/amon.go +++ b/plugins/outputs/amon/amon.go @@ -73,7 +73,7 @@ func (a *Amon) Write(metrics []telegraf.Metric) error { metricCounter++ } } else { - log.Printf("unable to build Metric for %s, skipping\n", m.Name()) + log.Printf("I! unable to build Metric for %s, skipping\n", m.Name()) } } diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go index bf9353d6e396f..00cc1a39df072 100644 --- a/plugins/outputs/amqp/amqp.go +++ b/plugins/outputs/amqp/amqp.go @@ -153,10 +153,10 @@ func (q *AMQP) Connect() error { } q.channel = channel go func() { - log.Printf("Closing: %s", <-connection.NotifyClose(make(chan *amqp.Error))) - log.Printf("Trying to reconnect") + log.Printf("I! Closing: %s", <-connection.NotifyClose(make(chan *amqp.Error))) + log.Printf("I! Trying to reconnect") for err := q.Connect(); err != nil; err = q.Connect() { - log.Println(err) + log.Println("E! ", err.Error()) time.Sleep(10 * time.Second) } diff --git a/plugins/outputs/cloudwatch/cloudwatch.go b/plugins/outputs/cloudwatch/cloudwatch.go index e143c23aa850e..045dae462f7ec 100644 --- a/plugins/outputs/cloudwatch/cloudwatch.go +++ b/plugins/outputs/cloudwatch/cloudwatch.go @@ -80,7 +80,7 @@ func (c *CloudWatch) Connect() error { _, err := svc.ListMetrics(params) // Try a read-only call to test connection. if err != nil { - log.Printf("cloudwatch: Error in ListMetrics API call : %+v \n", err.Error()) + log.Printf("E! cloudwatch: Error in ListMetrics API call : %+v \n", err.Error()) } c.svc = svc @@ -131,7 +131,7 @@ func (c *CloudWatch) WriteToCloudWatch(datums []*cloudwatch.MetricDatum) error { _, err := c.svc.PutMetricData(params) if err != nil { - log.Printf("CloudWatch: Unable to write to CloudWatch : %+v \n", err.Error()) + log.Printf("E! CloudWatch: Unable to write to CloudWatch : %+v \n", err.Error()) } return err diff --git a/plugins/outputs/datadog/datadog.go b/plugins/outputs/datadog/datadog.go index 0885687181c09..cf54de725eda9 100644 --- a/plugins/outputs/datadog/datadog.go +++ b/plugins/outputs/datadog/datadog.go @@ -92,7 +92,7 @@ func (d *Datadog) Write(metrics []telegraf.Metric) error { metricCounter++ } } else { - log.Printf("unable to build Metric for %s, skipping\n", m.Name()) + log.Printf("I! unable to build Metric for %s, skipping\n", m.Name()) } } diff --git a/plugins/outputs/graphite/graphite.go b/plugins/outputs/graphite/graphite.go index fb95aff83438c..c78b742755823 100644 --- a/plugins/outputs/graphite/graphite.go +++ b/plugins/outputs/graphite/graphite.go @@ -85,7 +85,7 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error { for _, metric := range metrics { gMetrics, err := s.Serialize(metric) if err != nil { - log.Printf("Error serializing some metrics to graphite: %s", err.Error()) + log.Printf("E! Error serializing some metrics to graphite: %s", err.Error()) } bp = append(bp, gMetrics...) } @@ -102,7 +102,7 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error { } if _, e := g.conns[n].Write([]byte(graphitePoints)); e != nil { // Error - log.Println("ERROR: " + e.Error()) + log.Println("E! Graphite Error: " + e.Error()) // Let's try the next one } else { // Success diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 1d6110b34612e..8c23b2c5acf58 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -130,7 +130,7 @@ func (i *InfluxDB) Connect() error { err = createDatabase(c, i.Database) if err != nil { - log.Println("Database creation failed: " + err.Error()) + log.Println("E! Database creation failed: " + err.Error()) continue } @@ -201,11 +201,11 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { for _, n := range p { if e := i.conns[n].Write(bp); e != nil { // Log write failure - log.Printf("ERROR: %s", e) + log.Printf("E! InfluxDB Output Error: %s", e) // If the database was not found, try to recreate it if strings.Contains(e.Error(), "database not found") { if errc := createDatabase(i.conns[n], i.Database); errc != nil { - log.Printf("ERROR: Database %s not found and failed to recreate\n", + log.Printf("E! Error: Database %s not found and failed to recreate\n", i.Database) } } diff --git a/plugins/outputs/instrumental/instrumental.go b/plugins/outputs/instrumental/instrumental.go index 88cdc1ebae0fb..ac8ac57b2e717 100644 --- a/plugins/outputs/instrumental/instrumental.go +++ b/plugins/outputs/instrumental/instrumental.go @@ -119,7 +119,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error { stats, err := s.Serialize(toSerialize) if err != nil { - log.Printf("Error serializing a metric to Instrumental: %s", err) + log.Printf("E! Error serializing a metric to Instrumental: %s", err) } switch metricType { @@ -144,7 +144,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error { if !ValueIncludesBadChar.MatchString(value) { points = append(points, fmt.Sprintf("%s %s %s %s", metricType, clean_metric, value, time)) } else if i.Debug { - log.Printf("Unable to send bad stat: %s", stat) + log.Printf("E! Instrumental unable to send bad stat: %s", stat) } } } @@ -152,9 +152,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error { allPoints := strings.Join(points, "\n") + "\n" _, err = fmt.Fprintf(i.conn, allPoints) - if i.Debug { - log.Println(allPoints) - } + log.Println("D! Instrumental: " + allPoints) if err != nil { if err == io.EOF { diff --git a/plugins/outputs/kinesis/kinesis.go b/plugins/outputs/kinesis/kinesis.go index b0fb566556afd..a30ab88018eff 100644 --- a/plugins/outputs/kinesis/kinesis.go +++ b/plugins/outputs/kinesis/kinesis.go @@ -83,7 +83,7 @@ func (k *KinesisOutput) Connect() error { // We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using // environment variables, and then Shared Credentials. if k.Debug { - log.Printf("kinesis: Establishing a connection to Kinesis in %+v", k.Region) + log.Printf("E! kinesis: Establishing a connection to Kinesis in %+v", k.Region) } credentialConfig := &internalaws.CredentialConfig{ @@ -105,17 +105,17 @@ func (k *KinesisOutput) Connect() error { resp, err := svc.ListStreams(KinesisParams) if err != nil { - log.Printf("kinesis: Error in ListSteams API call : %+v \n", err) + log.Printf("E! kinesis: Error in ListSteams API call : %+v \n", err) } if checkstream(resp.StreamNames, k.StreamName) { if k.Debug { - log.Printf("kinesis: Stream Exists") + log.Printf("E! kinesis: Stream Exists") } k.svc = svc return nil } else { - log.Printf("kinesis : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName) + log.Printf("E! kinesis : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName) os.Exit(1) } return err @@ -147,14 +147,14 @@ func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Du if k.Debug { resp, err := k.svc.PutRecords(payload) if err != nil { - log.Printf("kinesis: Unable to write to Kinesis : %+v \n", err.Error()) + log.Printf("E! kinesis: Unable to write to Kinesis : %+v \n", err.Error()) } - log.Printf("%+v \n", resp) + log.Printf("E! %+v \n", resp) } else { _, err := k.svc.PutRecords(payload) if err != nil { - log.Printf("kinesis: Unable to write to Kinesis : %+v \n", err.Error()) + log.Printf("E! kinesis: Unable to write to Kinesis : %+v \n", err.Error()) } } return time.Since(start) @@ -182,7 +182,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error { if sz == 500 { // Max Messages Per PutRecordRequest is 500 elapsed := writekinesis(k, r) - log.Printf("Wrote a %+v point batch to Kinesis in %+v.\n", sz, elapsed) + log.Printf("E! Wrote a %+v point batch to Kinesis in %+v.\n", sz, elapsed) atomic.StoreUint32(&sz, 0) r = nil } diff --git a/plugins/outputs/librato/librato.go b/plugins/outputs/librato/librato.go index 17d0d4c6ab6f5..3c4cb6d2acabf 100644 --- a/plugins/outputs/librato/librato.go +++ b/plugins/outputs/librato/librato.go @@ -103,15 +103,13 @@ func (l *Librato) Write(metrics []telegraf.Metric) error { if gauges, err := l.buildGauges(m); err == nil { for _, gauge := range gauges { tempGauges = append(tempGauges, gauge) - if l.Debug { - log.Printf("[DEBUG] Got a gauge: %v\n", gauge) - } + log.Printf("D! Got a gauge: %v\n", gauge) + } } else { - log.Printf("unable to build Gauge for %s, skipping\n", m.Name()) - if l.Debug { - log.Printf("[DEBUG] Couldn't build gauge: %v\n", err) - } + log.Printf("I! unable to build Gauge for %s, skipping\n", m.Name()) + log.Printf("D! Couldn't build gauge: %v\n", err) + } } @@ -132,9 +130,7 @@ func (l *Librato) Write(metrics []telegraf.Metric) error { return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error()) } - if l.Debug { - log.Printf("[DEBUG] Librato request: %v\n", string(metricsBytes)) - } + log.Printf("D! Librato request: %v\n", string(metricsBytes)) req, err := http.NewRequest( "POST", @@ -150,9 +146,7 @@ func (l *Librato) Write(metrics []telegraf.Metric) error { resp, err := l.client.Do(req) if err != nil { - if l.Debug { - log.Printf("[DEBUG] Error POSTing metrics: %v\n", err.Error()) - } + log.Printf("D! Error POSTing metrics: %v\n", err.Error()) return fmt.Errorf("error POSTing metrics, %s\n", err.Error()) } defer resp.Body.Close() @@ -160,7 +154,7 @@ func (l *Librato) Write(metrics []telegraf.Metric) error { if resp.StatusCode != 200 || l.Debug { htmlData, err := ioutil.ReadAll(resp.Body) if err != nil { - log.Printf("[DEBUG] Couldn't get response! (%v)\n", err) + log.Printf("D! Couldn't get response! (%v)\n", err) } if resp.StatusCode != 200 { return fmt.Errorf( @@ -168,9 +162,7 @@ func (l *Librato) Write(metrics []telegraf.Metric) error { resp.StatusCode, string(htmlData)) } - if l.Debug { - log.Printf("[DEBUG] Librato response: %v\n", string(htmlData)) - } + log.Printf("D! Librato response: %v\n", string(htmlData)) } } @@ -226,9 +218,8 @@ func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) { } gauges = append(gauges, gauge) } - if l.Debug { - fmt.Printf("[DEBUG] Built gauges: %v\n", gauges) - } + + log.Printf("D! Built gauges: %v\n", gauges) return gauges, nil } diff --git a/plugins/outputs/opentsdb/opentsdb_http.go b/plugins/outputs/opentsdb/opentsdb_http.go index 27e4afdda4188..f347d5e604f41 100644 --- a/plugins/outputs/opentsdb/opentsdb_http.go +++ b/plugins/outputs/opentsdb/opentsdb_http.go @@ -164,9 +164,11 @@ func (o *openTSDBHttp) flush() error { if resp.StatusCode/100 != 2 { if resp.StatusCode/100 == 4 { - log.Printf("WARNING: Received %d status code. Dropping metrics to avoid overflowing buffer.", resp.StatusCode) + log.Printf("E! Received %d status code. Dropping metrics to avoid overflowing buffer.", + resp.StatusCode) } else { - return fmt.Errorf("Error when sending metrics.Received status %d", resp.StatusCode) + return fmt.Errorf("Error when sending metrics. Received status %d", + resp.StatusCode) } } diff --git a/plugins/outputs/prometheus_client/prometheus_client.go b/plugins/outputs/prometheus_client/prometheus_client.go index 0598a5a41278e..fc89266020c67 100644 --- a/plugins/outputs/prometheus_client/prometheus_client.go +++ b/plugins/outputs/prometheus_client/prometheus_client.go @@ -167,7 +167,7 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { continue } if err != nil { - log.Printf("ERROR creating prometheus metric, "+ + log.Printf("E! Error creating prometheus metric, "+ "key: %s, labels: %v,\nerr: %s\n", mname, l, err.Error()) }