From 4d0276a3b36cd237a9dc0dd1ef793cdb02a5cb31 Mon Sep 17 00:00:00 2001 From: Denis Date: Tue, 8 Aug 2023 18:34:39 +0200 Subject: [PATCH] Filter out duplicate paths resolved from matching globs (#36256) Filter out duplicate paths resolved from matching globs Multiple globs can resolve in duplicate filenames. These duplicates must be filtered out even before they reach the creation of file descriptors and additional checks, otherwise this causes a flood of warning messages in logs. These warnings should catch only the cases when symlinks are resolved in known filenames. --- CHANGELOG.next.asciidoc | 1 + filebeat/input/filestream/fswatch.go | 9 +++- filebeat/input/filestream/fswatch_test.go | 57 +++++++++++++++++++++++ 3 files changed, 66 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f6ad8c57740..f66eb22f6ea 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -110,6 +110,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix panic when sqs input metrics getter is invoked {pull}36101[36101] {issue}36077[36077] - Make CEL input's `now` global variable static for evaluation lifetime. {pull}36107[36107] - Update mito CEL extension library to v1.5.0. {pull}36146[36146] +- Filter out duplicate paths resolved from matching globs. {issue}36253[36253] {pull}36256[36256] *Heartbeat* diff --git a/filebeat/input/filestream/fswatch.go b/filebeat/input/filestream/fswatch.go index ab166cefbe0..39d0275cfef 100644 --- a/filebeat/input/filestream/fswatch.go +++ b/filebeat/input/filestream/fswatch.go @@ -357,7 +357,8 @@ func (s *fileScanner) GetFiles() map[string]loginp.FileDescriptor { fdByName := map[string]loginp.FileDescriptor{} // used to determine if a symlink resolves in a already known target uniqueIDs := map[string]string{} - + // used to filter out duplicate matches + uniqueFiles := map[string]struct{}{} for _, path := range s.paths { matches, err := filepath.Glob(path) if err != nil { @@ -366,6 +367,12 @@ func (s *fileScanner) GetFiles() map[string]loginp.FileDescriptor { } for _, filename := range matches { + // in case multiple globs match on the same file we filter out duplicates + if _, knownFile := uniqueFiles[filename]; knownFile { + continue + } + uniqueFiles[filename] = struct{}{} + it, err := s.getIngestTarget(filename) if err != nil { s.log.Debugf("cannot create an ingest target for file %q: %s", filename, err) diff --git a/filebeat/input/filestream/fswatch_test.go b/filebeat/input/filestream/fswatch_test.go index d798cbaa752..3ca65673b78 100644 --- a/filebeat/input/filestream/fswatch_test.go +++ b/filebeat/input/filestream/fswatch_test.go @@ -347,6 +347,63 @@ scanner: // means no event require.Equal(t, loginp.OpDone, e.Op) }) + + t.Run("does not log warnings on duplicate globs and filters out duplicates", func(t *testing.T) { + dir := t.TempDir() + firstBasename := "file-123.ndjson" + secondBasename := "file-watcher-123.ndjson" + firstFilename := filepath.Join(dir, firstBasename) + secondFilename := filepath.Join(dir, secondBasename) + err := os.WriteFile(firstFilename, []byte("line\n"), 0777) + require.NoError(t, err) + err = os.WriteFile(secondFilename, []byte("line\n"), 0777) + require.NoError(t, err) + + paths := []string{ + // to emulate the case we have in the agent monitoring + filepath.Join(dir, "file-*.ndjson"), + filepath.Join(dir, "file-watcher-*.ndjson"), + } + cfgStr := ` +scanner: + check_interval: 100ms +` + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err = logp.DevelopmentSetup(logp.ToObserverOutput()) + require.NoError(t, err) + + fw := createWatcherWithConfig(t, paths, cfgStr) + + go fw.Run(ctx) + + e := fw.Event() + expEvent := loginp.FSEvent{ + NewPath: firstFilename, + Op: loginp.OpCreate, + Descriptor: loginp.FileDescriptor{ + Filename: firstFilename, + Info: testFileInfo{name: firstBasename, size: 5}, // "line\n" + }, + } + requireEqualEvents(t, expEvent, e) + + e = fw.Event() + expEvent = loginp.FSEvent{ + NewPath: secondFilename, + Op: loginp.OpCreate, + Descriptor: loginp.FileDescriptor{ + Filename: secondFilename, + Info: testFileInfo{name: secondBasename, size: 5}, // "line\n" + }, + } + requireEqualEvents(t, expEvent, e) + + logs := logp.ObserverLogs().FilterLevelExact(logp.WarnLevel.ZapLevel()).TakeAll() + require.Lenf(t, logs, 0, "must be no warning messages, got: %v", logs) + }) } func TestFileScanner(t *testing.T) {