Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Parser plugin restructuring #8791

Merged
merged 51 commits into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
d9a52f6
Create public parser interface.
Feb 2, 2021
acecda4
Add running-parser implementation.
Feb 2, 2021
a3a3dc3
Prepare parser registry.
Feb 2, 2021
543a279
Add internal config management structure for parsers.
Feb 2, 2021
c044d92
Implement parser registry and keep parsers.Parser definition for back…
Feb 2, 2021
b1fcc53
Add config functions for instantiating parsers the new way.
Feb 2, 2021
06ace41
Add function for tracking missing TOML fields locally in order to e.g…
Feb 2, 2021
be51ee2
Use the new way of instantiating parsers if available and fallback to…
Feb 2, 2021
254fa6e
Make sure running parsers' Init() function is called.
Feb 2, 2021
9925f3c
Convert CSV parser to new instantiation model.
Feb 2, 2021
f627f7c
Remove old CSV leftovers.
Feb 2, 2021
800f601
Move ParserFunc infra to telegraf.Parser and replace parser.Parser ev…
Feb 2, 2021
b68cc0e
Fixup formatting.
Feb 2, 2021
4e4301a
Fix file unit-test.
Feb 2, 2021
3518bb3
Return running-parser instance to allow collection of internal perfor…
Feb 2, 2021
0e4410c
Avoid CSV exception in inputs.tail and try a more general solution.
Feb 2, 2021
442efec
Implement new instantiation model for SetParserFunc inputs.
Feb 2, 2021
1f0d5fb
Fix formatting.
Feb 3, 2021
a3e6dbd
Make sure parser's Init() is only called on telegraf.ParserFunc call …
Feb 3, 2021
4a4f87f
Provide reason for blank import.
Feb 4, 2021
55484cd
Fix rebase fallout.
Mar 19, 2021
9b43f28
Fix rebase fallout.
Nov 18, 2021
0dc83e5
Fix linter error.
Nov 18, 2021
dbc00fd
Remove unnecessary 'HeaderParser' interface.
Nov 18, 2021
e279852
Update comment.
Nov 18, 2021
7870cfc
Remove stateful parser logic.
Nov 18, 2021
42e6a80
Bring back old parser interface for backward compatibility.
Nov 18, 2021
e8ee33e
Revert all plugins to the 'old' parser interface, only keeping plugin…
Nov 18, 2021
d41a2ea
Move parser definitions back to their original file to minimize diff.
Nov 20, 2021
0432cc8
Remove debug message.
Nov 20, 2021
fdc82cd
Add unit-test for new parser interface.
Nov 20, 2021
d601a86
Always initialize parsers if they provide an Init() function.
Nov 20, 2021
9b22133
Add unit-test for old parser interface.
Nov 20, 2021
9642fba
Revert changes in influxparser to minimize diff.
Nov 20, 2021
259c475
Revert changes in tail to minimize diff.
Nov 20, 2021
12f48cf
Revert changes in directory_monitor to minimize diff.
Nov 20, 2021
a380d8b
Revert changes in file to minimize diff.
Nov 20, 2021
adb8bee
Fix formatting.
Nov 20, 2021
51f0d19
Add testdata for old interface tests.
Nov 20, 2021
c8b54b7
Bring back the old way of instantiating the parser to keep compatible…
Dec 1, 2021
b29111d
Fix backward-compatible initialization with the old way for CSV.
Dec 1, 2021
b4e2683
Add compatibility test for the old way of setting up parsers.
Dec 1, 2021
a5d1f22
Improve parser tests.
Dec 2, 2021
fe92ef3
Remove commented imports.
Dec 2, 2021
8d447db
Remove double import and provide information which parser is used in …
Dec 2, 2021
c074f92
Improve deprecation comments.
Dec 2, 2021
106413d
Fix rebase fallout.
Jan 12, 2022
2a468d8
Fix missing logger after rebase.
Jan 12, 2022
bb789ef
Implement proper handling of SetParserFunc-user plugins.
Jan 12, 2022
8109072
Remove unused 'once' field.
Jan 12, 2022
0b7f4d7
Fix function documentation.
Jan 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ func (a *Agent) initPlugins() error {
input.LogName(), err)
}
}
for _, parser := range a.Config.Parsers {
err := parser.Init()
sspaink marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("could not initialize parser %s::%s: %v",
parser.Config.DataFormat, parser.Config.Parent, err)
}
}
for _, processor := range a.Config.Processors {
err := processor.Init()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/all"
"github.com/influxdata/telegraf/plugins/outputs"
_ "github.com/influxdata/telegraf/plugins/outputs/all"
_ "github.com/influxdata/telegraf/plugins/parsers/all"
_ "github.com/influxdata/telegraf/plugins/processors/all"
"gopkg.in/tomb.v1"
)
Expand Down
225 changes: 179 additions & 46 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type Config struct {
Inputs []*models.RunningInput
Outputs []*models.RunningOutput
Aggregators []*models.RunningAggregator
Parsers []*models.RunningParser
// Processors have a slice wrapper type because they need to be sorted
Processors models.RunningProcessors
AggProcessors models.RunningProcessors
Expand Down Expand Up @@ -103,6 +104,7 @@ func NewConfig() *Config {
Tags: make(map[string]string),
Inputs: make([]*models.RunningInput, 0),
Outputs: make([]*models.RunningOutput, 0),
Parsers: make([]*models.RunningParser, 0),
Processors: make([]*models.RunningProcessor, 0),
AggProcessors: make([]*models.RunningProcessor, 0),
InputFilters: make([]string, 0),
Expand Down Expand Up @@ -233,6 +235,15 @@ func (c *Config) AggregatorNames() []string {
return PluginNameCounts(name)
}

// ParserNames returns a list of strings of the configured parsers.
func (c *Config) ParserNames() []string {
var name []string
for _, parser := range c.Parsers {
name = append(name, parser.Config.DataFormat)
}
return PluginNameCounts(name)
}

// ProcessorNames returns a list of strings of the configured processors.
func (c *Config) ProcessorNames() []string {
var name []string
Expand Down Expand Up @@ -1048,6 +1059,39 @@ func (c *Config) addAggregator(name string, table *ast.Table) error {
return nil
}

func (c *Config) probeParser(table *ast.Table) bool {
var dataformat string
c.getFieldString(table, "data_format", &dataformat)

_, ok := parsers.Parsers[dataformat]
return ok
}

func (c *Config) addParser(parentname string, table *ast.Table) (*models.RunningParser, error) {
var dataformat string
c.getFieldString(table, "data_format", &dataformat)

creator, ok := parsers.Parsers[dataformat]
if !ok {
return nil, fmt.Errorf("Undefined but requested parser: %s", dataformat)
}
parser := creator(parentname)

conf, err := c.buildParser(parentname, table)
if err != nil {
return nil, err
}

if err := c.toml.UnmarshalTable(table, parser); err != nil {
return nil, err
}

running := models.NewRunningParser(parser, conf)
c.Parsers = append(c.Parsers, running)

return running, nil
}

func (c *Config) addProcessor(name string, table *ast.Table) error {
creator, ok := processors.Processors[name]
if !ok {
Expand Down Expand Up @@ -1162,6 +1206,17 @@ func (c *Config) addInput(name string, table *ast.Table) error {
name = "diskio"
}

// For inputs with parsers we need to compute the set of
// options that is not covered by both, the parser and the input.
// We achieve this by keeping a local book of missing entries
// that counts the number of misses. In case we have a parser
// for the input both need to miss the entry. We count the
// missing entries at the end.
missThreshold := 0
missCount := make(map[string]int)
c.setLocalMissingTomlFieldTracker(missCount)
defer c.resetMissingTomlFieldTracker()

creator, ok := inputs.Inputs[name]
if !ok {
// Handle removed, deprecated plugins
Expand All @@ -1174,35 +1229,95 @@ func (c *Config) addInput(name string, table *ast.Table) error {
}
input := creator()

// If the input has a SetParser function, then this means it can accept
// arbitrary types of input, so build the parser and set it.
if t, ok := input.(parsers.ParserInput); ok {
parser, err := c.buildParser(name, table)
if err != nil {
return err
// If the input has a SetParser or SetParserFunc function, it can accept
// arbitrary data-formats, so build the requested parser and set it.
if t, ok := input.(telegraf.ParserInput); ok {
missThreshold = 1
if parser, err := c.addParser(name, table); err == nil {
t.SetParser(parser)
} else {
missThreshold = 0
// Fallback to the old way of instantiating the parsers.
config, err := c.getParserConfig(name, table)
if err != nil {
return err
}
parser, err := c.buildParserOld(name, config)
if err != nil {
return err
}
t.SetParser(parser)
}
t.SetParser(parser)
}

if t, ok := input.(parsers.ParserFuncInput); ok {
config, err := c.getParserConfig(name, table)
if err != nil {
return err
// Keep the old interface for backward compatibility
if t, ok := input.(parsers.ParserInput); ok {
// DEPRECATED: Please switch your plugin to telegraf.ParserInput.
missThreshold = 1
if parser, err := c.addParser(name, table); err == nil {
t.SetParser(parser)
} else {
missThreshold = 0
// Fallback to the old way of instantiating the parsers.
config, err := c.getParserConfig(name, table)
if err != nil {
return err
}
parser, err := c.buildParserOld(name, config)
if err != nil {
return err
}
t.SetParser(parser)
}
t.SetParserFunc(func() (parsers.Parser, error) {
parser, err := parsers.NewParser(config)
}

if t, ok := input.(telegraf.ParserFuncInput); ok {
missThreshold = 1
if c.probeParser(table) {
t.SetParserFunc(func() (telegraf.Parser, error) {
parser, err := c.addParser(name, table)
if err != nil {
return nil, err
}
err = parser.Init()
return parser, err
})
} else {
missThreshold = 0
// Fallback to the old way
config, err := c.getParserConfig(name, table)
if err != nil {
return nil, err
return err
}
logger := models.NewLogger("parsers", config.DataFormat, name)
models.SetLoggerOnPlugin(parser, logger)
if initializer, ok := parser.(telegraf.Initializer); ok {
if err := initializer.Init(); err != nil {
t.SetParserFunc(func() (telegraf.Parser, error) {
return c.buildParserOld(name, config)
})
}
}

if t, ok := input.(parsers.ParserFuncInput); ok {
// DEPRECATED: Please switch your plugin to telegraf.ParserFuncInput.
missThreshold = 1
if c.probeParser(table) {
t.SetParserFunc(func() (parsers.Parser, error) {
parser, err := c.addParser(name, table)
if err != nil {
return nil, err
}
err = parser.Init()
return parser, err
})
} else {
missThreshold = 0
// Fallback to the old way
config, err := c.getParserConfig(name, table)
if err != nil {
return err
}
return parser, nil
})
t.SetParserFunc(func() (parsers.Parser, error) {
return c.buildParserOld(name, config)
})
}
}

pluginConfig, err := c.buildInput(name, table)
Expand All @@ -1221,6 +1336,17 @@ func (c *Config) addInput(name string, table *ast.Table) error {
rp := models.NewRunningInput(input, pluginConfig)
rp.SetDefaultTags(c.Tags)
c.Inputs = append(c.Inputs, rp)

// Check the number of misses against the threshold
for key, count := range missCount {
if count <= missThreshold {
continue
}
if err := c.missingTomlField(nil, key); err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -1265,6 +1391,21 @@ func (c *Config) buildAggregator(name string, tbl *ast.Table) (*models.Aggregato
return conf, nil
}

// buildParser parses Parser specific items from the ast.Table,
// builds the filter and returns a
// models.ParserConfig to be inserted into models.RunningParser
func (c *Config) buildParser(name string, tbl *ast.Table) (*models.ParserConfig, error) {
var dataformat string
c.getFieldString(tbl, "data_format", &dataformat)

conf := &models.ParserConfig{
Parent: name,
DataFormat: dataformat,
}

return conf, nil
}

// buildProcessor parses Processor specific items from the ast.Table,
// builds the filter and returns a
// models.ProcessorConfig to be inserted into models.RunningProcessor
Expand Down Expand Up @@ -1353,14 +1494,10 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e
return cp, nil
}

// buildParser grabs the necessary entries from the ast.Table for creating
// buildParserOld grabs the necessary entries from the ast.Table for creating
// a parsers.Parser object, and creates it, which can then be added onto
// an Input object.
func (c *Config) buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
config, err := c.getParserConfig(name, tbl)
if err != nil {
return nil, err
}
func (c *Config) buildParserOld(name string, config *parsers.Config) (telegraf.Parser, error) {
parser, err := parsers.NewParser(config)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1422,23 +1559,6 @@ func (c *Config) getParserConfig(name string, tbl *ast.Table) (*parsers.Config,
c.getFieldString(tbl, "grok_timezone", &pc.GrokTimezone)
c.getFieldString(tbl, "grok_unique_timestamp", &pc.GrokUniqueTimestamp)

//for csv parser
c.getFieldStringSlice(tbl, "csv_column_names", &pc.CSVColumnNames)
c.getFieldStringSlice(tbl, "csv_column_types", &pc.CSVColumnTypes)
c.getFieldStringSlice(tbl, "csv_tag_columns", &pc.CSVTagColumns)
c.getFieldString(tbl, "csv_timezone", &pc.CSVTimezone)
c.getFieldString(tbl, "csv_delimiter", &pc.CSVDelimiter)
c.getFieldString(tbl, "csv_comment", &pc.CSVComment)
c.getFieldString(tbl, "csv_measurement_column", &pc.CSVMeasurementColumn)
c.getFieldString(tbl, "csv_timestamp_column", &pc.CSVTimestampColumn)
c.getFieldString(tbl, "csv_timestamp_format", &pc.CSVTimestampFormat)
c.getFieldInt(tbl, "csv_header_row_count", &pc.CSVHeaderRowCount)
c.getFieldInt(tbl, "csv_skip_rows", &pc.CSVSkipRows)
c.getFieldInt(tbl, "csv_skip_columns", &pc.CSVSkipColumns)
c.getFieldBool(tbl, "csv_trim_space", &pc.CSVTrimSpace)
c.getFieldStringSlice(tbl, "csv_skip_values", &pc.CSVSkipValues)
c.getFieldBool(tbl, "csv_skip_errors", &pc.CSVSkipErrors)

c.getFieldStringSlice(tbl, "form_urlencoded_tag_keys", &pc.FormUrlencodedTagKeys)

c.getFieldString(tbl, "value_field_name", &pc.ValueFieldName)
Expand Down Expand Up @@ -1652,9 +1772,6 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
switch key {
case "alias", "carbon2_format", "carbon2_sanitize_replace_char", "collectd_auth_file",
"collectd_parse_multivalue", "collectd_security_level", "collectd_typesdb", "collection_jitter",
"csv_column_names", "csv_column_types", "csv_comment", "csv_delimiter", "csv_header_row_count",
"csv_measurement_column", "csv_skip_columns", "csv_skip_rows", "csv_tag_columns", "csv_skip_errors",
"csv_timestamp_column", "csv_timestamp_format", "csv_timezone", "csv_trim_space", "csv_skip_values",
"data_format", "data_type", "delay", "drop", "drop_original", "dropwizard_metric_registry_path",
"dropwizard_tag_paths", "dropwizard_tags_path", "dropwizard_time_format", "dropwizard_time_path",
"fielddrop", "fieldpass", "flush_interval", "flush_jitter", "form_urlencoded_tag_keys",
Expand All @@ -1679,6 +1796,22 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
return nil
}

func (c *Config) setLocalMissingTomlFieldTracker(counter map[string]int) {
f := func(_ reflect.Type, key string) error {
if c, ok := counter[key]; ok {
counter[key] = c + 1
} else {
counter[key] = 1
}
return nil
}
c.toml.MissingField = f
}

func (c *Config) resetMissingTomlFieldTracker() {
c.toml.MissingField = c.missingTomlField
}

func (c *Config) getFieldString(tbl *ast.Table, fieldName string, target *string) {
if node, ok := tbl.Fields[fieldName]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
Expand Down
Loading