Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to label inputs for logging #6207

Merged
merged 16 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var (
)

type MetricMaker interface {
Name() string
LogName() string
MakeMetric(metric telegraf.Metric) telegraf.Metric
}

Expand Down Expand Up @@ -111,7 +111,7 @@ func (ac *accumulator) AddError(err error) {
return
}
NErrors.Incr(1)
log.Printf("E! [%s]: Error in plugin: %v", ac.maker.Name(), err)
log.Printf("E! [%s] Error in plugin: %v", ac.maker.LogName(), err)
}

func (ac *accumulator) SetPrecision(precision time.Duration) {
Expand Down
4 changes: 4 additions & 0 deletions agent/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ func (tm *TestMetricMaker) Name() string {
return "TestPlugin"
}

func (tm *TestMetricMaker) LogName() string {
return tm.Name()
}

func (tm *TestMetricMaker) MakeMetric(metric telegraf.Metric) telegraf.Metric {
return metric
}
30 changes: 15 additions & 15 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error {

// Special instructions for some inputs. cpu, for example, needs to be
// run twice in order to return cpu usage percentages.
switch input.Name() {
switch input.Config.Name {
case "inputs.cpu", "inputs.mongodb", "inputs.procstat":
nulAcc := NewAccumulator(input, nulC)
nulAcc.SetPrecision(a.Precision())
Expand Down Expand Up @@ -334,8 +334,8 @@ func (a *Agent) gatherOnce(
case err := <-done:
return err
case <-ticker.C:
log.Printf("W! [agent] input %q did not complete within its interval",
input.Name())
log.Printf("W! [agent] [%s] did not complete within its interval",
input.LogName())
}
}
}
Expand Down Expand Up @@ -548,7 +548,7 @@ func (a *Agent) flush(

logError := func(err error) {
if err != nil {
log.Printf("E! [agent] Error writing to output [%s]: %v", output.Name, err)
log.Printf("E! [agent] Error writing to %s: %v", output.LogName(), err)
}
}

Expand Down Expand Up @@ -600,8 +600,8 @@ func (a *Agent) flushOnce(
output.LogBufferStatus()
return err
case <-ticker.C:
log.Printf("W! [agent] output %q did not complete within its flush interval",
output.Name)
log.Printf("W! [agent] [%q] did not complete within its flush interval",
output.LogName())
output.LogBufferStatus()
}
}
Expand All @@ -614,7 +614,7 @@ func (a *Agent) initPlugins() error {
err := input.Init()
if err != nil {
return fmt.Errorf("could not initialize input %s: %v",
input.Config.Name, err)
input.LogName(), err)
}
}
for _, processor := range a.Config.Processors {
Expand Down Expand Up @@ -644,11 +644,11 @@ func (a *Agent) initPlugins() error {
// connectOutputs connects to all outputs.
func (a *Agent) connectOutputs(ctx context.Context) error {
for _, output := range a.Config.Outputs {
log.Printf("D! [agent] Attempting connection to output: %s\n", output.Name)
log.Printf("D! [agent] Attempting connection to [%s]", output.LogName())
err := output.Output.Connect()
if err != nil {
log.Printf("E! [agent] Failed to connect to output %s, retrying in 15s, "+
"error was '%s' \n", output.Name, err)
log.Printf("E! [agent] Failed to connect to [%s], retrying in 15s, "+
"error was '%s'", output.LogName(), err)

err := internal.SleepContext(ctx, 15*time.Second)
if err != nil {
Expand All @@ -660,7 +660,7 @@ func (a *Agent) connectOutputs(ctx context.Context) error {
return err
}
}
log.Printf("D! [agent] Successfully connected to output: %s\n", output.Name)
log.Printf("D! [agent] Successfully connected to %s", output.LogName())
glinton marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}
Expand Down Expand Up @@ -690,8 +690,8 @@ func (a *Agent) startServiceInputs(

err := si.Start(acc)
if err != nil {
log.Printf("E! [agent] Service for input %s failed to start: %v",
input.Name(), err)
log.Printf("E! [agent] Service for [%s] failed to start: %v",
input.LogName(), err)

for _, si := range started {
si.Stop()
Expand Down Expand Up @@ -742,8 +742,8 @@ func panicRecover(input *models.RunningInput) {
if err := recover(); err != nil {
trace := make([]byte, 2048)
runtime.Stack(trace, true)
log.Printf("E! FATAL: Input [%s] panicked: %s, Stack:\n%s\n",
input.Name(), err, trace)
log.Printf("E! FATAL: [%s] panicked: %s, Stack:\n%s",
input.LogName(), err, trace)
log.Println("E! PLEASE REPORT THIS PANIC ON GITHUB with " +
"stack trace, configuration, and OS information: " +
"https://github.com/influxdata/telegraf/issues/new/choose")
Expand Down
9 changes: 0 additions & 9 deletions input.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,5 @@
package telegraf

// Initializer is an interface that all plugin types: Inputs, Outputs,
// Processors, and Aggregators can optionally implement to initialize the
// plugin.
type Initializer interface {
// Init performs one time setup of the plugin and returns an error if the
// configuration is invalid.
Init() error
}

type Input interface {
// SampleConfig returns the default configuration of the Input
SampleConfig() string
Expand Down
46 changes: 39 additions & 7 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (c *Config) AggregatorNames() []string {
func (c *Config) ProcessorNames() []string {
var name []string
for _, processor := range c.Processors {
name = append(name, processor.Name)
name = append(name, processor.Config.Name)
}
return name
}
Expand All @@ -196,7 +196,7 @@ func (c *Config) ProcessorNames() []string {
func (c *Config) OutputNames() []string {
var name []string
for _, output := range c.Outputs {
name = append(name, output.Name)
name = append(name, output.Config.Name)
}
return name
}
Expand Down Expand Up @@ -920,11 +920,7 @@ func (c *Config) addProcessor(name string, table *ast.Table) error {
return err
}

rf := &models.RunningProcessor{
Name: name,
Processor: processor,
Config: processorConfig,
}
rf := models.NewRunningProcessor(processor, processorConfig)

c.Processors = append(c.Processors, rf)
return nil
Expand Down Expand Up @@ -1103,6 +1099,14 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
}
}

if node, ok := tbl.Fields["alias"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
conf.Alias = str.Value
}
}
}

conf.Tags = make(map[string]string)
if node, ok := tbl.Fields["tags"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
Expand All @@ -1119,6 +1123,7 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
delete(tbl.Fields, "name_prefix")
delete(tbl.Fields, "name_suffix")
delete(tbl.Fields, "name_override")
delete(tbl.Fields, "alias")
delete(tbl.Fields, "tags")
var err error
conf.Filter, err = buildFilter(tbl)
Expand Down Expand Up @@ -1146,6 +1151,15 @@ func buildProcessor(name string, tbl *ast.Table) (*models.ProcessorConfig, error
}
}

if node, ok := tbl.Fields["alias"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
conf.Alias = str.Value
}
}
}

delete(tbl.Fields, "alias")
delete(tbl.Fields, "order")
var err error
conf.Filter, err = buildFilter(tbl)
Expand Down Expand Up @@ -1334,6 +1348,14 @@ func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) {
}
}

if node, ok := tbl.Fields["alias"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
cp.Alias = str.Value
}
}
}

cp.Tags = make(map[string]string)
if node, ok := tbl.Fields["tags"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
Expand All @@ -1346,6 +1368,7 @@ func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) {
delete(tbl.Fields, "name_prefix")
delete(tbl.Fields, "name_suffix")
delete(tbl.Fields, "name_override")
delete(tbl.Fields, "alias")
delete(tbl.Fields, "interval")
delete(tbl.Fields, "tags")
var err error
Expand Down Expand Up @@ -2007,9 +2030,18 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {
}
}

if node, ok := tbl.Fields["alias"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
oc.Alias = str.Value
}
}
}

delete(tbl.Fields, "flush_interval")
delete(tbl.Fields, "metric_buffer_limit")
delete(tbl.Fields, "metric_batch_size")
delete(tbl.Fields, "alias")

return oc, nil
}
87 changes: 87 additions & 0 deletions internal/models/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package models

import (
"log"
"reflect"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)

// Logger defines a logging structure for plugins.
type Logger struct {
Errs selfstat.Stat
Name string // Name is the plugin name, will be printed in the `[]`.
}

// Errorf logs an error message, patterned after log.Printf.
func (l *Logger) Errorf(format string, args ...interface{}) {
l.Errs.Incr(1)
log.Printf("E! ["+l.Name+"] "+format, args...)
}

// Error logs an error message, patterned after log.Print.
func (l *Logger) Error(args ...interface{}) {
l.Errs.Incr(1)
log.Print(append([]interface{}{"E! [" + l.Name + "] "}, args...)...)
}

// Debugf logs a debug message, patterned after log.Printf.
func (l *Logger) Debugf(format string, args ...interface{}) {
log.Printf("D! ["+l.Name+"] "+format, args...)
}

// Debug logs a debug message, patterned after log.Print.
func (l *Logger) Debug(args ...interface{}) {
log.Print(append([]interface{}{"D! [" + l.Name + "] "}, args...)...)
}

// Warnf logs a warning message, patterned after log.Printf.
func (l *Logger) Warnf(format string, args ...interface{}) {
log.Printf("W! ["+l.Name+"] "+format, args...)
}

// Warn logs a warning message, patterned after log.Print.
func (l *Logger) Warn(args ...interface{}) {
log.Print(append([]interface{}{"W! [" + l.Name + "] "}, args...)...)
}

// Infof logs an information message, patterned after log.Printf.
func (l *Logger) Infof(format string, args ...interface{}) {
log.Printf("I! ["+l.Name+"] "+format, args...)
}

// Info logs an information message, patterned after log.Print.
func (l *Logger) Info(args ...interface{}) {
log.Print(append([]interface{}{"I! [" + l.Name + "] "}, args...)...)
}

// logName returns the log-friendly name/type.
func logName(pluginType, name, alias string) string {
if alias == "" {
return pluginType + "." + name
}
return pluginType + "." + name + "::" + alias
}

func setLogIfExist(i interface{}, log telegraf.Logger) {
valI := reflect.ValueOf(i)

if valI.Type().Kind() != reflect.Ptr {
valI = reflect.New(reflect.TypeOf(i))
}

field := valI.Elem().FieldByName("Log")
if !field.IsValid() {
return
}

switch field.Type().String() {
case "telegraf.Logger":
if field.CanSet() {
field.Set(reflect.ValueOf(log))
}
}

return
}
Loading