Skip to content

Commit

Permalink
Add ability to label inputs for logging (influxdata#6207)
Browse files Browse the repository at this point in the history
  • Loading branch information
glinton authored and Mathieu Lecarme committed Apr 17, 2020
1 parent 4efca27 commit f7f5f1a
Show file tree
Hide file tree
Showing 22 changed files with 475 additions and 103 deletions.
4 changes: 2 additions & 2 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var (
)

type MetricMaker interface {
Name() string
LogName() string
MakeMetric(metric telegraf.Metric) telegraf.Metric
}

Expand Down Expand Up @@ -111,7 +111,7 @@ func (ac *accumulator) AddError(err error) {
return
}
NErrors.Incr(1)
log.Printf("E! [%s]: Error in plugin: %v", ac.maker.Name(), err)
log.Printf("E! [%s] Error in plugin: %v", ac.maker.LogName(), err)
}

func (ac *accumulator) SetPrecision(precision time.Duration) {
Expand Down
4 changes: 4 additions & 0 deletions agent/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ func (tm *TestMetricMaker) Name() string {
return "TestPlugin"
}

func (tm *TestMetricMaker) LogName() string {
return tm.Name()
}

func (tm *TestMetricMaker) MakeMetric(metric telegraf.Metric) telegraf.Metric {
return metric
}
30 changes: 15 additions & 15 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error {

// Special instructions for some inputs. cpu, for example, needs to be
// run twice in order to return cpu usage percentages.
switch input.Name() {
switch input.Config.Name {
case "inputs.cpu", "inputs.mongodb", "inputs.procstat":
nulAcc := NewAccumulator(input, nulC)
nulAcc.SetPrecision(a.Precision())
Expand Down Expand Up @@ -337,8 +337,8 @@ func (a *Agent) gatherOnce(
case err := <-done:
return err
case <-ticker.C:
log.Printf("W! [agent] input %q did not complete within its interval",
input.Name())
log.Printf("W! [agent] [%s] did not complete within its interval",
input.LogName())
}
}
}
Expand Down Expand Up @@ -551,7 +551,7 @@ func (a *Agent) flush(

logError := func(err error) {
if err != nil {
log.Printf("E! [agent] Error writing to output [%s]: %v", output.Name, err)
log.Printf("E! [agent] Error writing to %s: %v", output.LogName(), err)
}
}

Expand Down Expand Up @@ -603,8 +603,8 @@ func (a *Agent) flushOnce(
output.LogBufferStatus()
return err
case <-ticker.C:
log.Printf("W! [agent] output %q did not complete within its flush interval",
output.Name)
log.Printf("W! [agent] [%q] did not complete within its flush interval",
output.LogName())
output.LogBufferStatus()
}
}
Expand All @@ -617,7 +617,7 @@ func (a *Agent) initPlugins() error {
err := input.Init()
if err != nil {
return fmt.Errorf("could not initialize input %s: %v",
input.Config.Name, err)
input.LogName(), err)
}
}
for _, processor := range a.Config.Processors {
Expand Down Expand Up @@ -647,11 +647,11 @@ func (a *Agent) initPlugins() error {
// connectOutputs connects to all outputs.
func (a *Agent) connectOutputs(ctx context.Context) error {
for _, output := range a.Config.Outputs {
log.Printf("D! [agent] Attempting connection to output: %s\n", output.Name)
log.Printf("D! [agent] Attempting connection to [%s]", output.LogName())
err := output.Output.Connect()
if err != nil {
log.Printf("E! [agent] Failed to connect to output %s, retrying in 15s, "+
"error was '%s' \n", output.Name, err)
log.Printf("E! [agent] Failed to connect to [%s], retrying in 15s, "+
"error was '%s'", output.LogName(), err)

err := internal.SleepContext(ctx, 15*time.Second)
if err != nil {
Expand All @@ -663,7 +663,7 @@ func (a *Agent) connectOutputs(ctx context.Context) error {
return err
}
}
log.Printf("D! [agent] Successfully connected to output: %s\n", output.Name)
log.Printf("D! [agent] Successfully connected to %s", output.LogName())
}
return nil
}
Expand Down Expand Up @@ -693,8 +693,8 @@ func (a *Agent) startServiceInputs(

err := si.Start(acc)
if err != nil {
log.Printf("E! [agent] Service for input %s failed to start: %v",
input.Name(), err)
log.Printf("E! [agent] Service for [%s] failed to start: %v",
input.LogName(), err)

for _, si := range started {
si.Stop()
Expand Down Expand Up @@ -745,8 +745,8 @@ func panicRecover(input *models.RunningInput) {
if err := recover(); err != nil {
trace := make([]byte, 2048)
runtime.Stack(trace, true)
log.Printf("E! FATAL: Input [%s] panicked: %s, Stack:\n%s\n",
input.Name(), err, trace)
log.Printf("E! FATAL: [%s] panicked: %s, Stack:\n%s",
input.LogName(), err, trace)
log.Println("E! PLEASE REPORT THIS PANIC ON GITHUB with " +
"stack trace, configuration, and OS information: " +
"https://github.com/influxdata/telegraf/issues/new/choose")
Expand Down
9 changes: 0 additions & 9 deletions input.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,5 @@
package telegraf

// Initializer is an interface that all plugin types: Inputs, Outputs,
// Processors, and Aggregators can optionally implement to initialize the
// plugin.
type Initializer interface {
// Init performs one time setup of the plugin and returns an error if the
// configuration is invalid.
Init() error
}

type Input interface {
// SampleConfig returns the default configuration of the Input
SampleConfig() string
Expand Down
46 changes: 39 additions & 7 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (c *Config) AggregatorNames() []string {
func (c *Config) ProcessorNames() []string {
var name []string
for _, processor := range c.Processors {
name = append(name, processor.Name)
name = append(name, processor.Config.Name)
}
return name
}
Expand All @@ -196,7 +196,7 @@ func (c *Config) ProcessorNames() []string {
func (c *Config) OutputNames() []string {
var name []string
for _, output := range c.Outputs {
name = append(name, output.Name)
name = append(name, output.Config.Name)
}
return name
}
Expand Down Expand Up @@ -920,11 +920,7 @@ func (c *Config) addProcessor(name string, table *ast.Table) error {
return err
}

rf := &models.RunningProcessor{
Name: name,
Processor: processor,
Config: processorConfig,
}
rf := models.NewRunningProcessor(processor, processorConfig)

c.Processors = append(c.Processors, rf)
return nil
Expand Down Expand Up @@ -1103,6 +1099,14 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
}
}

if node, ok := tbl.Fields["alias"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
conf.Alias = str.Value
}
}
}

conf.Tags = make(map[string]string)
if node, ok := tbl.Fields["tags"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
Expand All @@ -1119,6 +1123,7 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
delete(tbl.Fields, "name_prefix")
delete(tbl.Fields, "name_suffix")
delete(tbl.Fields, "name_override")
delete(tbl.Fields, "alias")
delete(tbl.Fields, "tags")
var err error
conf.Filter, err = buildFilter(tbl)
Expand Down Expand Up @@ -1146,6 +1151,15 @@ func buildProcessor(name string, tbl *ast.Table) (*models.ProcessorConfig, error
}
}

if node, ok := tbl.Fields["alias"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
conf.Alias = str.Value
}
}
}

delete(tbl.Fields, "alias")
delete(tbl.Fields, "order")
var err error
conf.Filter, err = buildFilter(tbl)
Expand Down Expand Up @@ -1334,6 +1348,14 @@ func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) {
}
}

if node, ok := tbl.Fields["alias"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
cp.Alias = str.Value
}
}
}

cp.Tags = make(map[string]string)
if node, ok := tbl.Fields["tags"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
Expand All @@ -1346,6 +1368,7 @@ func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) {
delete(tbl.Fields, "name_prefix")
delete(tbl.Fields, "name_suffix")
delete(tbl.Fields, "name_override")
delete(tbl.Fields, "alias")
delete(tbl.Fields, "interval")
delete(tbl.Fields, "tags")
var err error
Expand Down Expand Up @@ -2007,9 +2030,18 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {
}
}

if node, ok := tbl.Fields["alias"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
oc.Alias = str.Value
}
}
}

delete(tbl.Fields, "flush_interval")
delete(tbl.Fields, "metric_buffer_limit")
delete(tbl.Fields, "metric_batch_size")
delete(tbl.Fields, "alias")

return oc, nil
}
87 changes: 87 additions & 0 deletions internal/models/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package models

import (
"log"
"reflect"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)

// Logger defines a logging structure for plugins.
type Logger struct {
Errs selfstat.Stat
Name string // Name is the plugin name, will be printed in the `[]`.
}

// Errorf logs an error message, patterned after log.Printf.
func (l *Logger) Errorf(format string, args ...interface{}) {
l.Errs.Incr(1)
log.Printf("E! ["+l.Name+"] "+format, args...)
}

// Error logs an error message, patterned after log.Print.
func (l *Logger) Error(args ...interface{}) {
l.Errs.Incr(1)
log.Print(append([]interface{}{"E! [" + l.Name + "] "}, args...)...)
}

// Debugf logs a debug message, patterned after log.Printf.
func (l *Logger) Debugf(format string, args ...interface{}) {
log.Printf("D! ["+l.Name+"] "+format, args...)
}

// Debug logs a debug message, patterned after log.Print.
func (l *Logger) Debug(args ...interface{}) {
log.Print(append([]interface{}{"D! [" + l.Name + "] "}, args...)...)
}

// Warnf logs a warning message, patterned after log.Printf.
func (l *Logger) Warnf(format string, args ...interface{}) {
log.Printf("W! ["+l.Name+"] "+format, args...)
}

// Warn logs a warning message, patterned after log.Print.
func (l *Logger) Warn(args ...interface{}) {
log.Print(append([]interface{}{"W! [" + l.Name + "] "}, args...)...)
}

// Infof logs an information message, patterned after log.Printf.
func (l *Logger) Infof(format string, args ...interface{}) {
log.Printf("I! ["+l.Name+"] "+format, args...)
}

// Info logs an information message, patterned after log.Print.
func (l *Logger) Info(args ...interface{}) {
log.Print(append([]interface{}{"I! [" + l.Name + "] "}, args...)...)
}

// logName returns the log-friendly name/type.
func logName(pluginType, name, alias string) string {
if alias == "" {
return pluginType + "." + name
}
return pluginType + "." + name + "::" + alias
}

func setLogIfExist(i interface{}, log telegraf.Logger) {
valI := reflect.ValueOf(i)

if valI.Type().Kind() != reflect.Ptr {
valI = reflect.New(reflect.TypeOf(i))
}

field := valI.Elem().FieldByName("Log")
if !field.IsValid() {
return
}

switch field.Type().String() {
case "telegraf.Logger":
if field.CanSet() {
field.Set(reflect.ValueOf(log))
}
}

return
}
Loading

0 comments on commit f7f5f1a

Please sign in to comment.