diff --git a/filebeat/input/filestream/fswatch.go b/filebeat/input/filestream/fswatch.go new file mode 100644 index 000000000000..d4bc1b5ea089 --- /dev/null +++ b/filebeat/input/filestream/fswatch.go @@ -0,0 +1,375 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filestream + +import ( + "fmt" + "os" + "path/filepath" + "time" + + "github.com/elastic/beats/v7/filebeat/input/file" + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/match" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/go-concert/unison" +) + +const ( + recursiveGlobDepth = 8 + scannerName = "scanner" + watcherDebugKey = "file_watcher" +) + +var ( + watcherFactories = map[string]watcherFactory{ + scannerName: newScannerWatcher, + } +) + +type watcherFactory func(paths []string, cfg *common.Config) (loginp.FSWatcher, error) + +// fileScanner looks for files which match the patterns in paths. +// It is able to exclude files and symlinks. +type fileScanner struct { + paths []string + excludedFiles []match.Matcher + symlinks bool + + log *logp.Logger +} + +type fileWatcherConfig struct { + // Interval is the time between two scans. + Interval time.Duration + // Scanner is the configuration of the scanner. + Scanner fileScannerConfig +} + +// fileWatcher gets the list of files from a FSWatcher and creates events by +// comparing the files between its last two runs. +type fileWatcher struct { + interval time.Duration + prev map[string]os.FileInfo + scanner loginp.FSScanner + log *logp.Logger + events chan loginp.FSEvent +} + +func newFileWatcher(paths []string, ns *common.ConfigNamespace) (loginp.FSWatcher, error) { + if ns == nil { + return newScannerWatcher(paths, nil) + } + + watcherType := ns.Name() + f, ok := watcherFactories[watcherType] + if !ok { + return nil, fmt.Errorf("no such file watcher: %s", watcherType) + } + + return f(paths, ns.Config()) +} + +func newScannerWatcher(paths []string, c *common.Config) (loginp.FSWatcher, error) { + config := defaultFileWatcherConfig() + err := c.Unpack(&config) + if err != nil { + return nil, err + } + scanner, err := newFileScanner(paths, config.Scanner) + if err != nil { + return nil, err + } + return &fileWatcher{ + log: logp.NewLogger(watcherDebugKey), + interval: config.Interval, + prev: make(map[string]os.FileInfo, 0), + scanner: scanner, + events: make(chan loginp.FSEvent), + }, nil +} + +func defaultFileWatcherConfig() fileWatcherConfig { + return fileWatcherConfig{ + Interval: 10 * time.Second, + Scanner: defaultFileScannerConfig(), + } +} + +func (w *fileWatcher) Run(ctx unison.Canceler) { + defer close(w.events) + + ticker := time.NewTicker(w.interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + w.watch(ctx) + } + } +} + +func (w *fileWatcher) watch(ctx unison.Canceler) { + w.log.Info("Start next scan") + + paths := w.scanner.GetFiles() + + newFiles := make(map[string]os.FileInfo) + + for path, info := range paths { + + prevInfo, ok := w.prev[path] + if !ok { + newFiles[path] = paths[path] + continue + } + + if prevInfo.ModTime() != info.ModTime() { + select { + case <-ctx.Done(): + return + case w.events <- writeEvent(path, info): + } + } + + // delete from previous state, as we have more up to date info + delete(w.prev, path) + } + + // remaining files are in the prev map are the ones that are missing + // either because they have been deleted or renamed + for removedPath, removedInfo := range w.prev { + for newPath, newInfo := range newFiles { + if os.SameFile(removedInfo, newInfo) { + select { + case <-ctx.Done(): + return + case w.events <- renamedEvent(removedPath, newPath, newInfo): + delete(newFiles, newPath) + goto CHECK_NEXT_REMOVED + } + } + } + + select { + case <-ctx.Done(): + return + case w.events <- deleteEvent(removedPath, removedInfo): + } + CHECK_NEXT_REMOVED: + } + + // remaining files in newFiles are new + for path, info := range newFiles { + select { + case <-ctx.Done(): + return + case w.events <- createEvent(path, info): + } + + } + + w.log.Debugf("Found %d paths", len(paths)) + w.prev = paths +} + +func createEvent(path string, fi os.FileInfo) loginp.FSEvent { + return loginp.FSEvent{Op: loginp.OpCreate, OldPath: "", NewPath: path, Info: fi} +} + +func writeEvent(path string, fi os.FileInfo) loginp.FSEvent { + return loginp.FSEvent{Op: loginp.OpWrite, OldPath: path, NewPath: path, Info: fi} +} + +func renamedEvent(oldPath, path string, fi os.FileInfo) loginp.FSEvent { + return loginp.FSEvent{Op: loginp.OpRename, OldPath: oldPath, NewPath: path, Info: fi} +} + +func deleteEvent(path string, fi os.FileInfo) loginp.FSEvent { + return loginp.FSEvent{Op: loginp.OpDelete, OldPath: path, NewPath: "", Info: fi} +} + +func (w *fileWatcher) Event() loginp.FSEvent { + return <-w.events +} + +type fileScannerConfig struct { + Paths []string + ExcludedFiles []match.Matcher + Symlinks bool + RecursiveGlob bool +} + +func defaultFileScannerConfig() fileScannerConfig { + return fileScannerConfig{ + Symlinks: false, + RecursiveGlob: true, + } +} + +func newFileScanner(paths []string, cfg fileScannerConfig) (loginp.FSScanner, error) { + fs := fileScanner{ + paths: paths, + excludedFiles: cfg.ExcludedFiles, + symlinks: cfg.Symlinks, + log: logp.NewLogger(scannerName), + } + err := fs.resolveRecursiveGlobs(cfg) + if err != nil { + return nil, err + } + err = fs.normalizeGlobPatterns() + if err != nil { + return nil, err + } + + return &fs, nil +} + +// resolveRecursiveGlobs expands `**` from the globs in multiple patterns +func (s *fileScanner) resolveRecursiveGlobs(c fileScannerConfig) error { + if !c.RecursiveGlob { + s.log.Debug("recursive glob disabled") + return nil + } + + s.log.Debug("recursive glob enabled") + var paths []string + for _, path := range s.paths { + patterns, err := file.GlobPatterns(path, recursiveGlobDepth) + if err != nil { + return err + } + if len(patterns) > 1 { + s.log.Debugf("%q expanded to %#v", path, patterns) + } + paths = append(paths, patterns...) + } + s.paths = paths + return nil +} + +// normalizeGlobPatterns calls `filepath.Abs` on all the globs from config +func (s *fileScanner) normalizeGlobPatterns() error { + var paths []string + for _, path := range s.paths { + pathAbs, err := filepath.Abs(path) + if err != nil { + return fmt.Errorf("failed to get the absolute path for %s: %v", path, err) + } + paths = append(paths, pathAbs) + } + s.paths = paths + return nil +} + +// GetFiles returns a map of files and fileinfos which +// match the configured paths. +func (s *fileScanner) GetFiles() map[string]os.FileInfo { + pathInfo := map[string]os.FileInfo{} + + for _, path := range s.paths { + matches, err := filepath.Glob(path) + if err != nil { + s.log.Errorf("glob(%s) failed: %v", path, err) + continue + } + + for _, file := range matches { + if s.shouldSkipFile(file) { + continue + } + + // If symlink is enabled, it is checked that original is not part of same input + // If original is harvested by other input, states will potentially overwrite each other + if s.isOriginalAndSymlinkConfigured(file, pathInfo) { + continue + } + + fileInfo, err := os.Stat(file) + if err != nil { + s.log.Debug("stat(%s) failed: %s", file, err) + continue + } + pathInfo[file] = fileInfo + } + } + + return pathInfo +} + +func (s *fileScanner) shouldSkipFile(file string) bool { + if s.isFileExcluded(file) { + s.log.Debugf("Exclude file: %s", file) + return true + } + + fileInfo, err := os.Lstat(file) + if err != nil { + s.log.Debugf("lstat(%s) failed: %s", file, err) + return true + } + + if fileInfo.IsDir() { + s.log.Debugf("Skipping directory: %s", file) + return true + } + + isSymlink := fileInfo.Mode()&os.ModeSymlink > 0 + if isSymlink && !s.symlinks { + s.log.Debugf("File %s skipped as it is a symlink", file) + return true + } + + return false +} + +func (s *fileScanner) isOriginalAndSymlinkConfigured(file string, paths map[string]os.FileInfo) bool { + if s.symlinks { + fileInfo, err := os.Stat(file) + if err != nil { + s.log.Debugf("stat(%s) failed: %s", file, err) + return false + } + + for _, finfo := range paths { + if os.SameFile(finfo, fileInfo) { + s.log.Info("Same file found as symlink and original. Skipping file: %s (as it same as %s)", file, finfo.Name()) + return true + } + } + } + return false +} + +func (s *fileScanner) isFileExcluded(file string) bool { + return len(s.excludedFiles) > 0 && s.matchAny(s.excludedFiles, file) +} + +// matchAny checks if the text matches any of the regular expressions +func (s *fileScanner) matchAny(matchers []match.Matcher, text string) bool { + for _, m := range matchers { + if m.MatchString(text) { + return true + } + } + return false +} diff --git a/filebeat/input/filestream/fswatch_test.go b/filebeat/input/filestream/fswatch_test.go new file mode 100644 index 000000000000..4979c9275c80 --- /dev/null +++ b/filebeat/input/filestream/fswatch_test.go @@ -0,0 +1,268 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filestream + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" + "github.com/elastic/beats/v7/libbeat/common/match" + "github.com/elastic/beats/v7/libbeat/logp" +) + +var ( + excludedFilePath = filepath.Join("testdata", "excluded_file") + includedFilePath = filepath.Join("testdata", "included_file") + directoryPath = filepath.Join("testdata", "unharvestable_dir") +) + +func TestFileScanner(t *testing.T) { + testCases := map[string]struct { + paths []string + excludedFiles []match.Matcher + symlinks bool + expectedFiles []string + }{ + "select all files": { + paths: []string{excludedFilePath, includedFilePath}, + expectedFiles: []string{ + mustAbsPath(excludedFilePath), + mustAbsPath(includedFilePath), + }, + }, + "skip excluded files": { + paths: []string{excludedFilePath, includedFilePath}, + excludedFiles: []match.Matcher{ + match.MustCompile("excluded_file"), + }, + expectedFiles: []string{ + mustAbsPath(includedFilePath), + }, + }, + "skip directories": { + paths: []string{directoryPath}, + expectedFiles: []string{}, + }, + } + + setupFilesForScannerTest(t) + defer removeFilesOfScannerTest(t) + + for name, test := range testCases { + test := test + + t.Run(name, func(t *testing.T) { + cfg := fileScannerConfig{ + ExcludedFiles: test.excludedFiles, + Symlinks: test.symlinks, + RecursiveGlob: false, + } + fs, err := newFileScanner(test.paths, cfg) + if err != nil { + t.Fatal(err) + } + files := fs.GetFiles() + paths := make([]string, 0) + for p, _ := range files { + paths = append(paths, p) + } + assert.True(t, checkIfSameContents(test.expectedFiles, paths)) + }) + } +} + +func setupFilesForScannerTest(t *testing.T) { + err := os.MkdirAll(directoryPath, 0750) + if err != nil { + t.Fatal(t) + } + + for _, path := range []string{excludedFilePath, includedFilePath} { + f, err := os.Create(path) + if err != nil { + t.Fatalf("file %s, error %v", path, err) + } + f.Close() + } +} + +func removeFilesOfScannerTest(t *testing.T) { + err := os.RemoveAll("testdata") + if err != nil { + t.Fatal(err) + } +} + +// only handles sets +func checkIfSameContents(one, other []string) bool { + if len(one) != len(other) { + return false + } + + mustFind := len(one) + for _, oneElem := range one { + for _, otherElem := range other { + if oneElem == otherElem { + mustFind-- + } + } + } + return mustFind == 0 +} + +func TestFileWatchNewDeleteModified(t *testing.T) { + oldTs := time.Now() + newTs := oldTs.Add(5 * time.Second) + testCases := map[string]struct { + prevFiles map[string]os.FileInfo + nextFiles map[string]os.FileInfo + expectedEvents []loginp.FSEvent + }{ + "one new file": { + prevFiles: map[string]os.FileInfo{}, + nextFiles: map[string]os.FileInfo{ + "new_path": testFileInfo{"new_path", 5, oldTs}, + }, + expectedEvents: []loginp.FSEvent{ + loginp.FSEvent{Op: loginp.OpCreate, OldPath: "", NewPath: "new_path", Info: testFileInfo{"new_path", 5, oldTs}}, + }, + }, + "one deleted file": { + prevFiles: map[string]os.FileInfo{ + "old_path": testFileInfo{"old_path", 5, oldTs}, + }, + nextFiles: map[string]os.FileInfo{}, + expectedEvents: []loginp.FSEvent{ + loginp.FSEvent{Op: loginp.OpDelete, OldPath: "old_path", NewPath: "", Info: testFileInfo{"old_path", 5, oldTs}}, + }, + }, + "one modified file": { + prevFiles: map[string]os.FileInfo{ + "path": testFileInfo{"path", 5, oldTs}, + }, + nextFiles: map[string]os.FileInfo{ + "path": testFileInfo{"path", 10, newTs}, + }, + expectedEvents: []loginp.FSEvent{ + loginp.FSEvent{Op: loginp.OpWrite, OldPath: "path", NewPath: "path", Info: testFileInfo{"path", 10, newTs}}, + }, + }, + "two modified files": { + prevFiles: map[string]os.FileInfo{ + "path1": testFileInfo{"path1", 5, oldTs}, + "path2": testFileInfo{"path2", 5, oldTs}, + }, + nextFiles: map[string]os.FileInfo{ + "path1": testFileInfo{"path1", 10, newTs}, + "path2": testFileInfo{"path2", 10, newTs}, + }, + expectedEvents: []loginp.FSEvent{ + loginp.FSEvent{Op: loginp.OpWrite, OldPath: "path1", NewPath: "path1", Info: testFileInfo{"path1", 10, newTs}}, + loginp.FSEvent{Op: loginp.OpWrite, OldPath: "path2", NewPath: "path2", Info: testFileInfo{"path2", 10, newTs}}, + }, + }, + "one modified file, one new file": { + prevFiles: map[string]os.FileInfo{ + "path1": testFileInfo{"path1", 5, oldTs}, + }, + nextFiles: map[string]os.FileInfo{ + "path1": testFileInfo{"path1", 10, newTs}, + "path2": testFileInfo{"path2", 10, newTs}, + }, + expectedEvents: []loginp.FSEvent{ + loginp.FSEvent{Op: loginp.OpWrite, OldPath: "path1", NewPath: "path1", Info: testFileInfo{"path1", 10, newTs}}, + loginp.FSEvent{Op: loginp.OpCreate, OldPath: "", NewPath: "path2", Info: testFileInfo{"path2", 10, newTs}}, + }, + }, + "one new file, one deleted file": { + prevFiles: map[string]os.FileInfo{ + "path_deleted": testFileInfo{"path_deleted", 5, oldTs}, + }, + nextFiles: map[string]os.FileInfo{ + "path_new": testFileInfo{"path_new", 10, newTs}, + }, + expectedEvents: []loginp.FSEvent{ + loginp.FSEvent{Op: loginp.OpDelete, OldPath: "path_deleted", NewPath: "", Info: testFileInfo{"path_deleted", 5, oldTs}}, + loginp.FSEvent{Op: loginp.OpCreate, OldPath: "", NewPath: "path_new", Info: testFileInfo{"path_new", 10, newTs}}, + }, + }, + } + + for name, test := range testCases { + test := test + + t.Run(name, func(t *testing.T) { + w := fileWatcher{ + log: logp.L(), + prev: test.prevFiles, + scanner: &mockScanner{test.nextFiles}, + events: make(chan loginp.FSEvent), + } + + go w.watch(context.Background()) + + for _, expectedEvent := range test.expectedEvents { + evt := w.Event() + assert.Equal(t, expectedEvent, evt) + } + }) + } +} + +type mockScanner struct { + files map[string]os.FileInfo +} + +func (m *mockScanner) GetFiles() map[string]os.FileInfo { + return m.files +} + +type testFileInfo struct { + path string + size int64 + time time.Time +} + +func (t testFileInfo) Name() string { return t.path } +func (t testFileInfo) Size() int64 { return t.size } +func (t testFileInfo) Mode() os.FileMode { return 0 } +func (t testFileInfo) ModTime() time.Time { return t.time } +func (t testFileInfo) IsDir() bool { return false } +func (t testFileInfo) Sys() interface{} { return nil } + +func mustAbsPath(path string) string { + p, err := filepath.Abs(path) + if err != nil { + panic(err) + } + return p +} + +func mustDuration(durStr string) time.Duration { + dur, err := time.ParseDuration(durStr) + if err != nil { + panic(err) + } + return dur +} diff --git a/filebeat/input/filestream/fswatch_test_non_windows.go b/filebeat/input/filestream/fswatch_test_non_windows.go new file mode 100644 index 000000000000..eecfeddf9308 --- /dev/null +++ b/filebeat/input/filestream/fswatch_test_non_windows.go @@ -0,0 +1,144 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !windows + +package filestream + +import ( + "context" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" + "github.com/elastic/beats/v7/libbeat/common/match" + "github.com/elastic/beats/v7/libbeat/logp" +) + +func TestFileScannerSymlinks(t *testing.T) { + testCases := map[string]struct { + paths []string + excludedFiles []match.Matcher + symlinks bool + expectedFiles []string + }{ + // covers test_input.py/test_skip_symlinks + "skip symlinks": { + paths: []string{ + filepath.Join("testdata", "symlink_to_included_file"), + filepath.Join("testdata", "included_file"), + }, + symlinks: false, + expectedFiles: []string{ + mustAbsPath(filepath.Join("testdata", "included_file")), + }, + }, + "return a file once if symlinks are enabled": { + paths: []string{ + filepath.Join("testdata", "symlink_to_included_file"), + filepath.Join("testdata", "included_file"), + }, + symlinks: true, + expectedFiles: []string{ + mustAbsPath(filepath.Join("testdata", "included_file")), + }, + }, + } + + err := os.Symlink( + mustAbsPath(filepath.Join("testdata", "included_file")), + mustAbsPath(filepath.Join("testdata", "symlink_to_included_file")), + ) + if err != nil { + t.Fatal(err) + } + + for name, test := range testCases { + test := test + + t.Run(name, func(t *testing.T) { + cfg := fileScannerConfig{ + ExcludedFiles: test.excludedFiles, + Symlinks: true, + RecursiveGlob: false, + } + fs, err := newFileScanner(test.paths, cfg) + if err != nil { + t.Fatal(err) + } + files := fs.GetFiles() + paths := make([]string, 0) + for p, _ := range files { + paths = append(paths, p) + } + assert.Equal(t, test.expectedFiles, paths) + }) + } +} + +func TestFileWatcherRenamedFile(t *testing.T) { + testPath := mustAbsPath("first_name") + renamedPath := mustAbsPath("renamed") + + f, err := os.Create(testPath) + if err != nil { + t.Fatal(err) + } + f.Close() + fi, err := os.Stat(testPath) + if err != nil { + t.Fatal(err) + } + + cfg := fileScannerConfig{ + ExcludedFiles: nil, + Symlinks: false, + RecursiveGlob: false, + } + scanner, err := newFileScanner([]string{testPath, renamedPath}, cfg) + if err != nil { + t.Fatal(err) + } + w := fileWatcher{ + log: logp.L(), + scanner: scanner, + events: make(chan loginp.FSEvent), + } + + go w.watch(context.Background()) + assert.Equal(t, loginp.FSEvent{Op: loginp.OpCreate, OldPath: "", NewPath: testPath, Info: fi}, w.Event()) + + err = os.Rename(testPath, renamedPath) + if err != nil { + t.Fatal(err) + } + defer os.Remove(renamedPath) + fi, err = os.Stat(renamedPath) + if err != nil { + t.Fatal(err) + } + + go w.watch(context.Background()) + evt := w.Event() + + assert.Equal(t, loginp.OpRename, evt.Op) + assert.Equal(t, testPath, evt.OldPath) + assert.Equal(t, renamedPath, evt.NewPath) +}