Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Migrate graphite parser to new style #11405

Merged
merged 9 commits into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ func TestConfig_ParserInterfaceNewFormat(t *testing.T) {
require.True(t, ok)
// Get the parser set with 'SetParser()'
if p, ok := input.Parser.(*models.RunningParser); ok {
require.NoError(t, p.Init())
actual = append(actual, p.Parser)
} else {
actual = append(actual, input.Parser)
Expand Down Expand Up @@ -614,6 +615,7 @@ func TestConfig_ParserInterfaceOldFormat(t *testing.T) {
require.True(t, ok)
// Get the parser set with 'SetParser()'
if p, ok := input.Parser.(*models.RunningParser); ok {
require.NoError(t, p.Init())
actual = append(actual, p.Parser)
} else {
actual = append(actual, input.Parser)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/Shopify/sarama"

"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil"

Expand Down Expand Up @@ -115,9 +116,9 @@ func TestRunParserAndGatherGraphite(t *testing.T) {
k.acc = &acc
defer close(k.done)

var err error
k.parser, err = parsers.NewGraphiteParser("_", []string{}, nil)
require.NoError(t, err)
p := graphite.Parser{Separator: "_", Templates: []string{}}
require.NoError(t, p.Init())
k.parser = &p
go k.receiver()
in <- saramaMsg(testMsgGraphite)
acc.Wait(1)
Expand Down
5 changes: 3 additions & 2 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ type Statsd struct {
// Max duration for each metric to stay cached without being updated.
MaxTTL config.Duration `toml:"max_ttl"`

graphiteParser *graphite.GraphiteParser
graphiteParser *graphite.Parser

acc telegraf.Accumulator

Expand Down Expand Up @@ -713,7 +713,8 @@ func (s *Statsd) parseName(bucket string) (name string, field string, tags map[s
var err error

if p == nil || s.graphiteParser.Separator != s.MetricSeparator {
p, err = graphite.NewGraphiteParser(s.MetricSeparator, s.Templates, nil)
p = &graphite.Parser{Separator: s.MetricSeparator, Templates: s.Templates}
err = p.Init()
s.graphiteParser = p
}

Expand Down
7 changes: 4 additions & 3 deletions plugins/inputs/tcp_listener/tcp_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil"
)
Expand Down Expand Up @@ -293,9 +294,9 @@ func TestRunParserGraphiteMsg(t *testing.T) {
listener.acc = &acc
defer close(listener.done)

var err error
listener.parser, err = parsers.NewGraphiteParser("_", []string{}, nil)
require.NoError(t, err)
p := graphite.Parser{Separator: "_", Templates: []string{}}
require.NoError(t, p.Init())
listener.parser = &p
listener.wg.Add(1)
go listener.tcpParser()

Expand Down
1 change: 1 addition & 0 deletions plugins/parsers/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/parsers/collectd"
_ "github.com/influxdata/telegraf/plugins/parsers/csv"
_ "github.com/influxdata/telegraf/plugins/parsers/form_urlencoded"
_ "github.com/influxdata/telegraf/plugins/parsers/graphite"
_ "github.com/influxdata/telegraf/plugins/parsers/json"
_ "github.com/influxdata/telegraf/plugins/parsers/json_v2"
_ "github.com/influxdata/telegraf/plugins/parsers/value"
Expand Down
67 changes: 37 additions & 30 deletions plugins/parsers/graphite/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/templating"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
)

// Minimum and maximum supported dates for timestamps.
Expand All @@ -20,45 +21,34 @@ var (
MaxDate = time.Date(2038, 1, 19, 0, 0, 0, 0, time.UTC)
)

type GraphiteParser struct {
Separator string
Templates []string
DefaultTags map[string]string
templateEngine *templating.Engine
}
type Parser struct {
Separator string `toml:"separator"`
Templates []string `toml:"templates"`
DefaultTags map[string]string ` toml:"-"`

func (p *GraphiteParser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
templateEngine *templating.Engine
}

func NewGraphiteParser(
separator string,
templates []string,
defaultTags map[string]string,
) (*GraphiteParser, error) {
var err error

if separator == "" {
separator = DefaultSeparator
}
p := &GraphiteParser{
Separator: separator,
Templates: templates,
func (p *Parser) Init() error {
// Set defaults
if p.Separator == "" {
p.Separator = DefaultSeparator
}

if defaultTags != nil {
p.DefaultTags = defaultTags
defaultTemplate, err := templating.NewDefaultTemplateWithPattern("measurement*")
if err != nil {
return fmt.Errorf("creating template failed: %w", err)
}
defaultTemplate, _ := templating.NewDefaultTemplateWithPattern("measurement*")
p.templateEngine, err = templating.NewEngine(p.Separator, defaultTemplate, p.Templates)

p.templateEngine, err = templating.NewEngine(p.Separator, defaultTemplate, p.Templates)
if err != nil {
return p, fmt.Errorf("exec input parser config is error: %s ", err.Error())
return fmt.Errorf("creating template engine failed: %w ", err)
}
return p, nil

return nil
}

func (p *GraphiteParser) Parse(buf []byte) ([]telegraf.Metric, error) {
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
// parse even if the buffer begins with a newline
if len(buf) != 0 && buf[0] == '\n' {
buf = buf[1:]
Expand Down Expand Up @@ -95,7 +85,7 @@ func (p *GraphiteParser) Parse(buf []byte) ([]telegraf.Metric, error) {
}

// ParseLine performs Graphite parsing of a single line.
func (p *GraphiteParser) ParseLine(line string) (telegraf.Metric, error) {
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
// Break into 3 fields (name, value, timestamp).
fields := strings.Fields(line)
if len(fields) != 2 && len(fields) != 3 {
Expand Down Expand Up @@ -178,7 +168,8 @@ func (p *GraphiteParser) ParseLine(line string) (telegraf.Metric, error) {

// ApplyTemplate extracts the template fields from the given line and
// returns the measurement name and tags.
func (p *GraphiteParser) ApplyTemplate(line string) (string, map[string]string, string, error) {
//nolint:revive // This function should be eliminated anyway
func (p *Parser) ApplyTemplate(line string) (string, map[string]string, string, error) {
// Break line into fields (name, value, timestamp), only name is used
fields := strings.Fields(line)
if len(fields) == 0 {
Expand All @@ -196,3 +187,19 @@ func (p *GraphiteParser) ApplyTemplate(line string) (string, map[string]string,

return name, tags, field, err
}

func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}

func init() {
parsers.Add("graphite", func(_ string) telegraf.Parser { return &Parser{} })
}

func (p *Parser) InitFromConfig(config *parsers.Config) error {
p.Templates = append(p.Templates, config.Templates...)
p.Separator = config.Separator
p.DefaultTags = config.DefaultTags

return p.Init()
}
Loading