Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(agent): add ignore_error_inputs option for inputs #11313

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,19 +184,31 @@ func (a *Agent) Run(ctx context.Context) error {
return err
}

// determine whether to ignore the Input of the error
// It has two option: input.ignore_init_error and agent.ignore_error_inputs
func (a *Agent) isIgnoreInput(errInput *models.RunningInput) bool {
return errInput.Config.IgnoreInitError || a.Config.Agent.IgnoreErrorInputs
}

// initPlugins runs the Init function on plugins.
func (a *Agent) initPlugins() error {
inputs := make([]*models.RunningInput, 0)
for _, input := range a.Config.Inputs {
// Share the snmp translator setting with plugins that need it.
if tp, ok := input.Input.(snmp.TranslatorPlugin); ok {
tp.SetTranslator(a.Config.Agent.SnmpTranslator)
}
err := input.Init()
if err != nil {
if err != nil && a.isIgnoreInput(input) {
log.Printf("W! [agent] Ignore initialize error input %s: %v", input.LogName(), err)
continue
} else if err != nil {
return fmt.Errorf("could not initialize input %s: %v",
input.LogName(), err)
}
inputs = append(inputs, input)
}
a.Config.Inputs = inputs
for _, parser := range a.Config.Parsers {
err := parser.Init()
if err != nil {
Expand Down
73 changes: 73 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package agent

import (
"fmt"
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/models"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
_ "github.com/influxdata/telegraf/plugins/outputs/all"
"github.com/stretchr/testify/assert"
Expand All @@ -19,6 +22,76 @@ func TestAgent_OmitHostname(t *testing.T) {
assert.NotContains(t, c.Tags, "host")
}

func TestAgent_isIgnoreInput(t *testing.T) {
c := config.NewConfig()
assert.False(t, c.Agent.IgnoreErrorInputs)
a, err := NewAgent(c)
assert.NoError(t, err)
input := &models.RunningInput{
Config: &models.InputConfig{},
}
assert.False(t, a.Config.Agent.IgnoreErrorInputs)
assert.False(t, input.Config.IgnoreInitError)
// default: input.ignore_init_error=false and agent.ignore_error_inputs=false
assert.False(t, a.isIgnoreInput(input))

a.Config.Agent.IgnoreErrorInputs = true
// input.ignore_init_error=false and agent.ignore_error_inputs=true
assert.True(t, a.isIgnoreInput(input))

input.Config.IgnoreInitError = true
// input.ignore_init_error=true and agent.ignore_error_inputs=true
assert.True(t, a.isIgnoreInput(input))

a.Config.Agent.IgnoreErrorInputs = false
// input.ignore_init_error=false and agent.ignore_error_inputs=false
assert.True(t, a.isIgnoreInput(input))
}

type testIgnoreErrorInput struct {
}

func (i *testIgnoreErrorInput) Init() error {
return fmt.Errorf("could not initialize input: test error")
}

func (i *testIgnoreErrorInput) Gather(telegraf.Accumulator) error {
return nil
}
func (i *testIgnoreErrorInput) SampleConfig() string {
return ""
}

func TestAgent_IgnoreErrorInputs(t *testing.T) {
c := config.NewConfig()
assert.False(t, c.Agent.IgnoreErrorInputs)
c.Inputs = []*models.RunningInput{{}}
a, err := NewAgent(c)
assert.NoError(t, err)
err = a.initPlugins()
assert.NoError(t, err)
assert.Equal(t, 1, len(c.Inputs))

c.Inputs = []*models.RunningInput{{
Config: &models.InputConfig{
Name: "test error input",
Alias: "test alias",
Interval: 10 * time.Second,
},
Input: &testIgnoreErrorInput{},
}}
a, err = NewAgent(c)
assert.NoError(t, err)
err = a.initPlugins()
assert.Error(t, err)

assert.Equal(t, 1, len(c.Inputs))
c.Agent.IgnoreErrorInputs = true
err = a.initPlugins()
assert.NoError(t, err)
assert.Equal(t, 0, len(c.Inputs))
}

func TestAgent_LoadPlugin(t *testing.T) {
c := config.NewConfig()
c.InputFilters = []string{"mysql"}
Expand Down
9 changes: 8 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ type AgentConfig struct {
// Pick a timezone to use when logging or type 'local' for local time.
LogWithTimezone string `toml:"log_with_timezone"`

// Ignore Inputs that error to initialize
IgnoreErrorInputs bool `toml:"ignore_error_inputs"`

Hostname string
OmitHostname bool

Expand Down Expand Up @@ -418,6 +421,9 @@ var agentConfig = `
## Example: America/Chicago
# log_with_timezone = ""

## Indicated whether ignore input plugins that produce the error during the initialization.
# ignore_error_inputs = false

## Override default hostname, if empty use os.Hostname()
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
Expand Down Expand Up @@ -1517,6 +1523,7 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e
c.getFieldString(tbl, "name_suffix", &cp.MeasurementSuffix)
c.getFieldString(tbl, "name_override", &cp.NameOverride)
c.getFieldString(tbl, "alias", &cp.Alias)
c.getFieldBool(tbl, "ignore_init_error", &cp.IgnoreInitError)

cp.Tags = make(map[string]string)
if node, ok := tbl.Fields["tags"]; ok {
Expand Down Expand Up @@ -1842,7 +1849,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
"prefix", "prometheus_export_timestamp", "prometheus_ignore_timestamp", "prometheus_sort_metrics", "prometheus_string_as_label",
"separator", "splunkmetric_hec_routing", "splunkmetric_multimetric", "tag_keys",
"tagdrop", "tagexclude", "taginclude", "tagpass", "tags", "template", "templates",
"value_field_name", "wavefront_source_override", "wavefront_use_strict", "wavefront_disable_prefix_conversion":
"value_field_name", "wavefront_source_override", "wavefront_use_strict", "wavefront_disable_prefix_conversion", "ignore_init_error":

// ignore fields that are common to all plugins.
default:
Expand Down
22 changes: 22 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,26 @@ func TestConfig_LoadSingleInput(t *testing.T) {
require.Equal(t, inputConfig, c.Inputs[0].Config, "Testdata did not produce correct memcached metadata.")
}

func TestConfig_ignoreErrorInput(t *testing.T) {
c := NewConfig()
c.InputFilters = []string{"memcached"}
err := c.LoadConfig("./testdata/ignore_error_input.toml")
require.NoError(t, err)
require.True(t, c.Inputs[0].Config.IgnoreInitError)

c = NewConfig()
c.InputFilters = []string{"http_listener_v2"}
err = c.LoadConfig("./testdata/ignore_error_input.toml")
require.NoError(t, err)
require.False(t, c.Inputs[0].Config.IgnoreInitError)

c = NewConfig()
c.InputFilters = []string{"ignore_init_error_test"}
err = c.LoadConfig("./testdata/ignore_error_input.toml")
require.NoError(t, err)
require.False(t, c.Inputs[0].Config.IgnoreInitError)
}

func TestConfig_LoadDirectory(t *testing.T) {
c := NewConfig()
require.NoError(t, c.LoadConfig("./testdata/single_plugin.toml"))
Expand Down Expand Up @@ -720,6 +740,8 @@ func init() {
inputs.Add("memcached", func() telegraf.Input { return &MockupInputPlugin{} })
inputs.Add("procstat", func() telegraf.Input { return &MockupInputPlugin{} })

inputs.Add("ignore_init_error_test", func() telegraf.Input { return &MockupInputPlugin{} })

// Register the mockup output plugin for the required names
outputs.Add("azure_monitor", func() telegraf.Output { return &MockupOuputPlugin{NamespacePrefix: "Telegraf/"} })
outputs.Add("http", func() telegraf.Output { return &MockupOuputPlugin{} })
Expand Down
7 changes: 7 additions & 0 deletions config/testdata/ignore_error_input.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[[inputs.memcached]]
ignore_init_error=true

[[inputs.http_listener_v2]]

[[inputs.ignore_init_error_test]]
ignore_init_error=false
15 changes: 15 additions & 0 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ The agent table configures Telegraf and the defaults used across all plugins.
translates by calling external programs snmptranslate and snmptable,
or "gosmi" which translates using the built-in gosmi library.

- **ignore_error_inputs**:
If set to true, discard the input plugins that produce an error during initialization. Otherwise, the program will exit when an input plugin has an error occurred during the initialization.

## Plugins

Telegraf plugins are divided into 4 types: [inputs][], [outputs][],
Expand Down Expand Up @@ -300,6 +303,10 @@ Parameters that can be used with any input plugin:
- **name_override**: Override the base name of the measurement. (Default is
the name of the input).

- **ignore_init_error**:
Overrides the `ignore_error_inputs` setting of the [agent][Agent] for the plugin.
If set to true, discard the error during initialization. Otherwise, the program will exit when the plugin has an error occurred during the initialization.

- **name_prefix**: Specifies a prefix to attach to the measurement name.

- **name_suffix**: Specifies a suffix to attach to the measurement name.
Expand Down Expand Up @@ -329,6 +336,14 @@ Use the name_override parameter to emit measurements with the name `foobar`:
totalcpu = true
```

Use the ignore_init_error parameter to ignore the `docker` when it init fail:

```toml
[[inputs.docker]]
ignore_init_error = true
servers = ["127.0.0.1:27017"]
```

Emit measurements with two additional tags: `tag1=foo` and `tag2=bar`

> **NOTE**: With TOML, order matters. Parameters belong to the last defined
Expand Down
3 changes: 3 additions & 0 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@
## Example: America/Chicago
# log_with_timezone = ""

## Indicated whether ignore input plugins that produce the error during the initialization.
# ignore_error_inputs = false

## Override default hostname, if empty use os.Hostname()
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
Expand Down
3 changes: 3 additions & 0 deletions etc/telegraf_windows.conf
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@
## Example: America/Chicago
# log_with_timezone = ""

## Indicated whether ignore input plugins that produce the error during the initialization.
# ignore_error_inputs = false

## Override default hostname, if empty use os.Hostname()
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
Expand Down
1 change: 1 addition & 0 deletions models/running_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type InputConfig struct {
CollectionJitter time.Duration
CollectionOffset time.Duration
Precision time.Duration
IgnoreInitError bool

NameOverride string
MeasurementPrefix string
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/statsd/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,7 @@ func TestCachesExpireAfterMaxTTL(t *testing.T) {
require.NoError(t, s.parseStatsdLine("valid:45|c"))
require.NoError(t, s.Gather(acc))

// Wait for the metrics to arrive
// Wait for the metrics to arrive
observeralone marked this conversation as resolved.
Show resolved Hide resolved
acc.Wait(3)

testutil.RequireMetricsEqual(t,
Expand Down