Skip to content

Commit

Permalink
[7.17](backport #36256) Filter out duplicate paths resolved from matc…
Browse files Browse the repository at this point in the history
…hing globs (#36264)

Filter out duplicate paths resolved from matching globs (#36256)

Multiple globs can resolve in duplicate filenames. These duplicates
must be filtered out even before they reach the creation of file descriptors
and additional checks, otherwise this causes a flood of warning
messages in logs.

These warnings should catch only the cases when symlinks are
resolved in known filenames.

(cherry picked from commit 4d0276a)

---------

Co-authored-by: Denis <denis.rechkunov@elastic.co>
  • Loading branch information
mergify[bot] and rdner authored Aug 8, 2023
1 parent 68e9cc4 commit 8b932df
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 3 deletions.
3 changes: 1 addition & 2 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
*Affecting all Beats*
- Upgrade Go to 1.19.12 {pull}36247[36247]


*Filebeat*

- Filter out duplicate paths resolved from matching globs. {issue}36253[36253] {pull}36256[36256]

*Auditbeat*

Expand Down
9 changes: 8 additions & 1 deletion filebeat/input/filestream/fswatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ func (s *fileScanner) GetFiles() map[string]loginp.FileDescriptor {
fdByName := map[string]loginp.FileDescriptor{}
// used to determine if a symlink resolves in a already known target
uniqueIDs := map[string]string{}

// used to filter out duplicate matches
uniqueFiles := map[string]struct{}{}
for _, path := range s.paths {
matches, err := filepath.Glob(path)
if err != nil {
Expand All @@ -366,6 +367,12 @@ func (s *fileScanner) GetFiles() map[string]loginp.FileDescriptor {
}

for _, filename := range matches {
// in case multiple globs match on the same file we filter out duplicates
if _, knownFile := uniqueFiles[filename]; knownFile {
continue
}
uniqueFiles[filename] = struct{}{}

it, err := s.getIngestTarget(filename)
if err != nil {
s.log.Debugf("cannot create an ingest target for file %q: %s", filename, err)
Expand Down
57 changes: 57 additions & 0 deletions filebeat/input/filestream/fswatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,63 @@ scanner:
// means no event
require.Equal(t, loginp.OpDone, e.Op)
})

t.Run("does not log warnings on duplicate globs and filters out duplicates", func(t *testing.T) {
dir := t.TempDir()
firstBasename := "file-123.ndjson"
secondBasename := "file-watcher-123.ndjson"
firstFilename := filepath.Join(dir, firstBasename)
secondFilename := filepath.Join(dir, secondBasename)
err := os.WriteFile(firstFilename, []byte("line\n"), 0777)
require.NoError(t, err)
err = os.WriteFile(secondFilename, []byte("line\n"), 0777)
require.NoError(t, err)

paths := []string{
// to emulate the case we have in the agent monitoring
filepath.Join(dir, "file-*.ndjson"),
filepath.Join(dir, "file-watcher-*.ndjson"),
}
cfgStr := `
scanner:
check_interval: 100ms
`

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err = logp.DevelopmentSetup(logp.ToObserverOutput())
require.NoError(t, err)

fw := createWatcherWithConfig(t, paths, cfgStr)

go fw.Run(ctx)

e := fw.Event()
expEvent := loginp.FSEvent{
NewPath: firstFilename,
Op: loginp.OpCreate,
Descriptor: loginp.FileDescriptor{
Filename: firstFilename,
Info: testFileInfo{path: firstBasename, size: 5}, // "line\n"
},
}
requireEqualEvents(t, expEvent, e)

e = fw.Event()
expEvent = loginp.FSEvent{
NewPath: secondFilename,
Op: loginp.OpCreate,
Descriptor: loginp.FileDescriptor{
Filename: secondFilename,
Info: testFileInfo{path: secondBasename, size: 5}, // "line\n"
},
}
requireEqualEvents(t, expEvent, e)

logs := logp.ObserverLogs().FilterLevelExact(logp.WarnLevel.ZapLevel()).TakeAll()
require.Lenf(t, logs, 0, "must be no warning messages, got: %v", logs)
})
}

func TestFileScanner(t *testing.T) {
Expand Down

0 comments on commit 8b932df

Please sign in to comment.