Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(inputs/directory_monitor): Add support for multiline file parsing #11234

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions plugins/inputs/directory_monitor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

This plugin monitors a single directory (without looking at sub-directories),
and takes in each file placed in the directory. The plugin will gather all
files in the directory at a configurable interval (`monitor_interval`), and
parse the ones that haven't been picked up yet.
files in the directory at the configured interval, and parse the ones that
haven't been picked up yet.

This plugin is intended to read files that are moved or copied to the monitored
directory, and thus files should also not be used by another process or else
Expand Down Expand Up @@ -54,10 +54,18 @@ be guaranteed to finish writing before the `directory_duration_threshold`.
## https://docs.influxdata.com/influxdb/cloud/reference/glossary/#series-cardinality
# file_tag = ""
#
## Specify if the file can be read completely at once or if it needs to be read line by line (default).
## Possible values: "line-by-line", "at-once"
# parse_method = "line-by-line"
#
## The dataformat to be read from the files.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
## NOTE: We currently only support parsing newline-delimited JSON. See the format here: https://github.com/ndjson/ndjson-spec
data_format = "influx"
```

## Metrics

The format of metrics produced by this plugin depends on the content and data
format of the file.
63 changes: 50 additions & 13 deletions plugins/inputs/directory_monitor/directory_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
Expand All @@ -36,6 +37,7 @@ var (
defaultMaxBufferedMetrics = 10000
defaultDirectoryDurationThreshold = config.Duration(0 * time.Millisecond)
defaultFileQueueSize = 100000
defaultParseMethod = "line-by-line"
)

type DirectoryMonitor struct {
Expand All @@ -50,6 +52,7 @@ type DirectoryMonitor struct {
DirectoryDurationThreshold config.Duration `toml:"directory_duration_threshold"`
Log telegraf.Logger `toml:"-"`
FileQueueSize int `toml:"file_queue_size"`
ParseMethod string `toml:"parse_method"`

filesInUse sync.Map
cancel context.CancelFunc
Expand Down Expand Up @@ -200,7 +203,7 @@ func (monitor *DirectoryMonitor) ingestFile(filePath string) error {

parser, err := monitor.parserFunc()
if err != nil {
return fmt.Errorf("E! Creating parser: %s", err.Error())
return fmt.Errorf("creating parser: %w", err)
}

// Handle gzipped files.
Expand All @@ -218,41 +221,70 @@ func (monitor *DirectoryMonitor) ingestFile(filePath string) error {
}

func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Reader, fileName string) error {
var splitter bufio.SplitFunc

// Decide on how to split the file
switch monitor.ParseMethod {
case "at-once":
return monitor.parseAtOnce(parser, reader, fileName)
case "line-by-line":
splitter = bufio.ScanLines
default:
return fmt.Errorf("unknown parse method %q", monitor.ParseMethod)
}

scanner := bufio.NewScanner(reader)
scanner.Split(splitter)

for scanner.Scan() {
metrics, err := monitor.parseLine(parser, scanner.Bytes())
metrics, err := monitor.parseMetrics(parser, scanner.Bytes(), fileName)
if err != nil {
return err
}

if monitor.FileTag != "" {
for _, m := range metrics {
m.AddTag(monitor.FileTag, filepath.Base(fileName))
}
}

if err := monitor.sendMetrics(metrics); err != nil {
return err
}
}

return nil
return scanner.Err()
}

func (monitor *DirectoryMonitor) parseAtOnce(parser parsers.Parser, reader io.Reader, fileName string) error {
bytes, err := io.ReadAll(reader)
if err != nil {
return err
}

metrics, err := monitor.parseMetrics(parser, bytes, fileName)
if err != nil {
return err
}

return monitor.sendMetrics(metrics)
}

func (monitor *DirectoryMonitor) parseLine(parser parsers.Parser, line []byte) ([]telegraf.Metric, error) {
func (monitor *DirectoryMonitor) parseMetrics(parser parsers.Parser, line []byte, fileName string) (metrics []telegraf.Metric, err error) {
switch parser.(type) {
case *csv.Parser:
m, err := parser.Parse(line)
metrics, err = parser.Parse(line)
if err != nil {
if errors.Is(err, io.EOF) {
return nil, nil
}
return nil, err
}
return m, err
default:
return parser.Parse(line)
metrics, err = parser.Parse(line)
}

if monitor.FileTag != "" {
for _, m := range metrics {
m.AddTag(monitor.FileTag, filepath.Base(fileName))
}
}

return metrics, err
}

func (monitor *DirectoryMonitor) sendMetrics(metrics []telegraf.Metric) error {
Expand Down Expand Up @@ -357,6 +389,10 @@ func (monitor *DirectoryMonitor) Init() error {
monitor.fileRegexesToIgnore = append(monitor.fileRegexesToIgnore, regex)
}

if err := choice.Check(monitor.ParseMethod, []string{"line-by-line", "at-once"}); err != nil {
return fmt.Errorf("config option parse_method: %w", err)
}
Hipska marked this conversation as resolved.
Show resolved Hide resolved

return nil
}

Expand All @@ -368,6 +404,7 @@ func init() {
MaxBufferedMetrics: defaultMaxBufferedMetrics,
DirectoryDurationThreshold: defaultDirectoryDurationThreshold,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
Hipska marked this conversation as resolved.
Show resolved Hide resolved
}
})
}
99 changes: 87 additions & 12 deletions plugins/inputs/directory_monitor/directory_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,28 @@ import (

"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/testutil"
)

func TestCreator(t *testing.T) {
creator, found := inputs.Inputs["directory_monitor"]
require.True(t, found)

expected := &DirectoryMonitor{
FilesToMonitor: defaultFilesToMonitor,
FilesToIgnore: defaultFilesToIgnore,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
DirectoryDurationThreshold: defaultDirectoryDurationThreshold,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}

require.Equal(t, expected, creator())
}

func TestCSVGZImport(t *testing.T) {
acc := testutil.Accumulator{}
testCsvFile := "test.csv"
Expand All @@ -27,8 +44,9 @@ func TestCSVGZImport(t *testing.T) {
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: 1000,
FileQueueSize: 100000,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}
err := r.Init()
require.NoError(t, err)
Expand Down Expand Up @@ -91,8 +109,9 @@ func TestMultipleJSONFileImports(t *testing.T) {
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: 1000,
FileQueueSize: 1000,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}
err := r.Init()
require.NoError(t, err)
Expand Down Expand Up @@ -140,8 +159,9 @@ func TestFileTag(t *testing.T) {
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
FileTag: "filename",
MaxBufferedMetrics: 1000,
FileQueueSize: 1000,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}
err := r.Init()
require.NoError(t, err)
Expand Down Expand Up @@ -194,8 +214,9 @@ func TestCSVNoSkipRows(t *testing.T) {
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: 1000,
FileQueueSize: 100000,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}
err := r.Init()
require.NoError(t, err)
Expand Down Expand Up @@ -262,8 +283,9 @@ func TestCSVSkipRows(t *testing.T) {
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: 1000,
FileQueueSize: 100000,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}
err := r.Init()
require.NoError(t, err)
Expand Down Expand Up @@ -332,8 +354,9 @@ func TestCSVMultiHeader(t *testing.T) {
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: 1000,
FileQueueSize: 100000,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}
err := r.Init()
require.NoError(t, err)
Expand Down Expand Up @@ -387,3 +410,55 @@ hello,80,test_name2`
require.Equal(t, expectedFields, m.Fields)
}
}

func TestParseCompleteFile(t *testing.T) {
acc := testutil.Accumulator{}

// Establish process directory and finished directory.
finishedDirectory := t.TempDir()
processDirectory := t.TempDir()

// Init plugin.
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: "at-once",
}
err := r.Init()
require.NoError(t, err)
r.Log = testutil.Logger{}

parserConfig := parsers.Config{
DataFormat: "json",
JSONNameKey: "name",
TagKeys: []string{"tag1"},
}

r.SetParserFunc(func() (parsers.Parser, error) {
return parsers.NewParser(&parserConfig)
})

testJSON := `{
"name": "test1",
"value": 100.1,
"tag1": "value1"
}`

// Write json file to process into the 'process' directory.
f, _ := os.CreateTemp(processDirectory, "test.json")
_, _ = f.WriteString(testJSON)
_ = f.Close()

err = r.Start(&acc)
require.NoError(t, err)
err = r.Gather(&acc)
require.NoError(t, err)
acc.Wait(1)
r.Stop()

require.NoError(t, acc.FirstError())
require.Len(t, acc.Metrics, 1)
testutil.RequireMetricEqual(t, testutil.TestMetric(100.1), acc.GetTelegrafMetrics()[0], testutil.IgnoreTime())
}
5 changes: 4 additions & 1 deletion plugins/inputs/directory_monitor/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@
## https://docs.influxdata.com/influxdb/cloud/reference/glossary/#series-cardinality
# file_tag = ""
#
## Specify if the file can be read completely at once or if it needs to be read line by line (default).
## Possible values: "line-by-line", "at-once"
# parse_method = "line-by-line"
#
## The dataformat to be read from the files.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
## NOTE: We currently only support parsing newline-delimited JSON. See the format here: https://github.com/ndjson/ndjson-spec
data_format = "influx"