diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index fdaddde7a39f..c50c838c2dd2 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -347,7 +347,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415 - Improve CEL input performance. {pull}35915[35915] - Adding filename details from zip to response for httpjson {issue}33952[33952] {pull}34044[34044] - Add `clean_session` configuration setting for MQTT input. {pull}35806[16204] - +- Add fingerprint mode for the filestream scanner and new file identity based on it {issue}34419[34419] {pull}35734[35734] *Auditbeat* - Migration of system/package module storage from gob encoding to flatbuffer encoding in bolt db. {pull}34817[34817] diff --git a/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl b/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl index e3424de1914b..68ff3d22e07b 100644 --- a/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl +++ b/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl @@ -300,6 +300,19 @@ filebeat.inputs: # original for harvesting but will report the symlink name as the source. #prospector.scanner.symlinks: false + # If enabled, instead of relying on the device ID and inode values when comparing files, + # compare hashes of the given byte ranges in files. A file becomes an ingest target + # when its size grows larger than offset+length (see below). Until then it's ignored. + #prospector.scanner.fingerprint.enabled: false + + # If fingerprint mode is enabled, sets the offset from the beginning of the file + # for the byte range used for computing the fingerprint value. + #prospector.scanner.fingerprint.offset: 0 + + # If fingerprint mode is enabled, sets the length of the byte range used for + # computing the fingerprint value. Cannot be less than 64 bytes. + #prospector.scanner.fingerprint.length: 1024 + ### Parsers configuration #### JSON configuration diff --git a/filebeat/docs/inputs/input-common-file-options.asciidoc b/filebeat/docs/inputs/input-common-file-options.asciidoc index c913e4e33808..7d62c5827fec 100644 --- a/filebeat/docs/inputs/input-common-file-options.asciidoc +++ b/filebeat/docs/inputs/input-common-file-options.asciidoc @@ -77,10 +77,10 @@ certain criteria or time. Closing the harvester means closing the file handler. If a file is updated after the harvester is closed, the file will be picked up again after `scan_frequency` has elapsed. However, if the file is moved or deleted while the harvester is closed, {beatname_uc} will not be able to pick up -the file again, and any data that the harvester hasn't read will be lost. -The `close_*` settings are applied synchronously when {beatname_uc} attempts +the file again, and any data that the harvester hasn't read will be lost. +The `close_*` settings are applied synchronously when {beatname_uc} attempts to read from a file, meaning that if {beatname_uc} is in a blocked state -due to blocked output, full queue or other issue, a file that would +due to blocked output, full queue or other issue, a file that would otherwise be closed remains open until {beatname_uc} once again attempts to read from the file. @@ -240,7 +240,7 @@ that should be removed based on the `clean_inactive` setting. This happens because {beatname_uc} doesn't remove the entries until it opens the registry again to read a different file. If you are testing the `clean_inactive` setting, make sure {beatname_uc} is configured to read from more than one file, or the -file state will never be removed from the registry. +file state will never be removed from the registry. [float] [id="{beatname_lc}-input-{type}-clean-removed"] @@ -441,4 +441,3 @@ Set the location of the marker file the following way: ---- file_identity.inode_marker.path: /logs/.filebeat-marker ---- - diff --git a/filebeat/docs/inputs/input-filestream-file-options.asciidoc b/filebeat/docs/inputs/input-filestream-file-options.asciidoc index 68629d32e8ae..8b1bd083668f 100644 --- a/filebeat/docs/inputs/input-filestream-file-options.asciidoc +++ b/filebeat/docs/inputs/input-filestream-file-options.asciidoc @@ -146,6 +146,62 @@ stays open and constantly polls your files. The default setting is 10s. +[float] +[id="{beatname_lc}-input-{type}-scan-fingerprint"] +===== `prospector.scanner.fingerprint` + +Instead of relying on the device ID and inode values when comparing files, compare hashes of the given byte ranges of files. + +Enable this option if you're experiencing data loss or data duplication due to unstable file identifiers provided by the file system. + +Following are some scenarios where this can happen: + +. Some file systems (i.e. in Docker) cache and re-use inodes ++ +for example if you: ++ +.. Create a file (`touch x`) +.. Check the file's inode (`ls -i x`) +.. Delete the file (`rm x`) +.. Create a new file right away (`touch y`) +.. Check the inode of the new file (`ls -i y`) ++ + +For both files you might see the same inode value despite even having different filenames. ++ +. Non-Ext file systems can change inodes: ++ +Ext file systems store the inode number in the `i_ino` file, inside a struct `inode`, which is written to disk. In this case, if the file is the same (not another file with the same name) then the inode number is guaranteed to be the same. ++ +If the file system is other than Ext, the inode number is generated by the inode operations defined by the file system driver. As they don't have the concept of what an inode is, they have to mimic all of the inode's internal fields to comply with VFS, so this number will probably be different after a reboot, even after closing and opening the file again (theoretically). ++ +. Some file processing tools change inode values ++ +Sometimes users unintentionally change inodes by using tools like `rsync` or `sed`. ++ +. Some operating systems change device IDs after reboot ++ +Depending on a mounting approach, the device ID (which is also used for comparing files) might change after a reboot. + +**Configuration** + +Fingerprint mode is disabled by default. + +WARNING: Enabling fingerprint mode delays ingesting new files until they grow to at least `offset`+`length` bytes in size, so they can be fingerprinted. Until then these files are ignored. + +Normally, log lines contain timestamps and other unique fields that should be able to use the fingerprint mode, +but in every use-case users should inspect their logs to determine what are the appropriate values for +the `offset` and `length` parameters. Default `offset` is `0` and default `length` is `1024` or 1 KB. `length` cannot be less than `64`. + +[source,yaml] +---- +fingerprint: + enabled: false + offset: 0 + length: 1024 +---- + + [float] [id="{beatname_lc}-input-{type}-ignore-older"] ===== `ignore_older` @@ -502,6 +558,17 @@ Set the location of the marker file the following way: file_identity.inode_marker.path: /logs/.filebeat-marker ---- +*`fingerprint`*:: To identify files based on their content byte range. + +WARNING: In order to use this file identity option, you must enable the <<{beatname_lc}-input-filestream-scan-fingerprint,fingerprint option in the scanner>>. Once this file identity is enabled, changing the fingerprint configuration (offset, length, or other settings) will lead to a global re-ingestion of all files that match the paths configuration of the input. + +Please refer to the <<{beatname_lc}-input-filestream-scan-fingerprint,fingerprint configuration for details>>. + +[source,yaml] +---- +file_identity.fingerprint: ~ +---- + [[filestream-log-rotation-support]] [float] === Log rotation diff --git a/filebeat/docs/inputs/input-filestream.asciidoc b/filebeat/docs/inputs/input-filestream.asciidoc index bb9e32e235c6..e55ff6114962 100644 --- a/filebeat/docs/inputs/input-filestream.asciidoc +++ b/filebeat/docs/inputs/input-filestream.asciidoc @@ -95,7 +95,7 @@ device IDs. However, on network shares and cloud providers these values might change during the lifetime of the file. If this happens {beatname_uc} thinks that file is new and resends the whole content of the file. To solve this problem you can configure `file_identity` option. Possible -values besides the default `inode_deviceid` are `path` and `inode_marker`. +values besides the default `inode_deviceid` are `path`, `inode_marker` and `fingerprint`. WARNING: Changing `file_identity` methods between runs may result in duplicated events in the output. @@ -116,6 +116,13 @@ example oneliner generates a hidden marker file for the selected mountpoint `/lo Please note that you should not use this option on Windows as file identifiers might be more volatile. +Selecting `fingerprint` instructs {beatname_uc} to identify files based on their +content byte range. + +WARNING: In order to use this file identity option, one must enable the <<{beatname_lc}-input-filestream-scan-fingerprint,fingerprint option in the scanner>>. Once this file identity is enabled, changing the fingerprint configuration (offset, length, etc) will lead to a global re-ingestion of all files that match the paths configuration of the input. + +Please refer to the <<{beatname_lc}-input-filestream-scan-fingerprint,fingerprint configuration for details>>. + ["source","sh",subs="attributes"] ---- $ lsblk -o MOUNTPOINT,UUID | grep /logs | awk '{print $2}' >> /logs/.filebeat-marker diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 87053c242134..2de0bc61f565 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -707,6 +707,19 @@ filebeat.inputs: # original for harvesting but will report the symlink name as the source. #prospector.scanner.symlinks: false + # If enabled, instead of relying on the device ID and inode values when comparing files, + # compare hashes of the given byte ranges in files. A file becomes an ingest target + # when its size grows larger than offset+length (see below). Until then it's ignored. + #prospector.scanner.fingerprint.enabled: false + + # If fingerprint mode is enabled, sets the offset from the beginning of the file + # for the byte range used for computing the fingerprint value. + #prospector.scanner.fingerprint.offset: 0 + + # If fingerprint mode is enabled, sets the length of the byte range used for + # computing the fingerprint value. Cannot be less than 64 bytes. + #prospector.scanner.fingerprint.length: 1024 + ### Parsers configuration #### JSON configuration diff --git a/filebeat/input/filestream/config.go b/filebeat/input/filestream/config.go index a9ca55d8eb25..f6e1ca03f4c6 100644 --- a/filebeat/input/filestream/config.go +++ b/filebeat/input/filestream/config.go @@ -33,6 +33,7 @@ import ( type config struct { Reader readerConfig `config:",inline"` + ID string `config:"id"` Paths []string `config:"paths"` Close closerConfig `config:"close"` FileWatcher *conf.Namespace `config:"prospector"` diff --git a/filebeat/input/filestream/copytruncate_prospector.go b/filebeat/input/filestream/copytruncate_prospector.go index 1f12f167a4c2..10884cb9b94e 100644 --- a/filebeat/input/filestream/copytruncate_prospector.go +++ b/filebeat/input/filestream/copytruncate_prospector.go @@ -329,7 +329,9 @@ func (p *copyTruncateFileProspector) onRotatedFile( hg.Start(ctx, src) return } - originalSrc := p.identifier.GetSource(loginp.FSEvent{NewPath: originalPath, Info: fi}) + descCopy := fe.Descriptor + descCopy.Info = fi + originalSrc := p.identifier.GetSource(loginp.FSEvent{NewPath: originalPath, Descriptor: descCopy}) p.rotatedFiles.addOriginalFile(originalPath, originalSrc) p.rotatedFiles.addRotatedFile(originalPath, fe.NewPath, src) hg.Start(ctx, src) diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index 2c121fa8c4a8..cc53d23a2147 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -374,7 +374,7 @@ func (e *inputTestingEnvironment) getRegistryState(key string) (registryEntry, e func getIDFromPath(filepath, inputID string, fi os.FileInfo) string { identifier, _ := newINodeDeviceIdentifier(nil) - src := identifier.GetSource(loginp.FSEvent{Info: fi, Op: loginp.OpCreate, NewPath: filepath}) + src := identifier.GetSource(loginp.FSEvent{Descriptor: loginp.FileDescriptor{Info: fi}, Op: loginp.OpCreate, NewPath: filepath}) return "filestream::" + inputID + "::" + src.Name() } diff --git a/filebeat/input/filestream/fswatch.go b/filebeat/input/filestream/fswatch.go index 68ff28f47ddf..371465e9b352 100644 --- a/filebeat/input/filestream/fswatch.go +++ b/filebeat/input/filestream/fswatch.go @@ -18,7 +18,10 @@ package filestream import ( + "crypto/sha256" + "encoding/hex" "fmt" + "io" "os" "path/filepath" "time" @@ -28,35 +31,18 @@ import ( "github.com/elastic/beats/v7/filebeat/input/file" loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" - file_helper "github.com/elastic/beats/v7/libbeat/common/file" "github.com/elastic/beats/v7/libbeat/common/match" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) const ( - RecursiveGlobDepth = 8 - scannerName = "scanner" - watcherDebugKey = "file_watcher" + RecursiveGlobDepth = 8 + DefaultFingerprintSize int64 = 1024 // 1KB + scannerDebugKey = "scanner" + watcherDebugKey = "file_watcher" ) -var watcherFactories = map[string]watcherFactory{ - scannerName: newScannerWatcher, -} - -type watcherFactory func(paths []string, cfg *conf.C) (loginp.FSWatcher, error) - -// fileScanner looks for files which match the patterns in paths. -// It is able to exclude files and symlinks. -type fileScanner struct { - paths []string - excludedFiles []match.Matcher - includedFiles []match.Matcher - symlinks bool - - log *logp.Logger -} - type fileWatcherConfig struct { // Interval is the time between two scans. Interval time.Duration `config:"check_interval"` @@ -70,27 +56,22 @@ type fileWatcherConfig struct { // fileWatcher gets the list of files from a FSWatcher and creates events by // comparing the files between its last two runs. type fileWatcher struct { - interval time.Duration - resendOnModTime bool - prev map[string]os.FileInfo - scanner loginp.FSScanner - log *logp.Logger - events chan loginp.FSEvent - sameFileFunc func(os.FileInfo, os.FileInfo) bool + cfg fileWatcherConfig + prev map[string]loginp.FileDescriptor + scanner loginp.FSScanner + log *logp.Logger + events chan loginp.FSEvent } func newFileWatcher(paths []string, ns *conf.Namespace) (loginp.FSWatcher, error) { + var config *conf.C if ns == nil { - return newScannerWatcher(paths, conf.NewConfig()) + config = conf.NewConfig() + } else { + config = ns.Config() } - watcherType := ns.Name() - f, ok := watcherFactories[watcherType] - if !ok { - return nil, fmt.Errorf("no such file watcher: %s", watcherType) - } - - return f(paths, ns.Config()) + return newScannerWatcher(paths, config) } func newScannerWatcher(paths []string, c *conf.C) (loginp.FSWatcher, error) { @@ -104,13 +85,11 @@ func newScannerWatcher(paths []string, c *conf.C) (loginp.FSWatcher, error) { return nil, err } return &fileWatcher{ - log: logp.NewLogger(watcherDebugKey), - interval: config.Interval, - resendOnModTime: config.ResendOnModTime, - prev: make(map[string]os.FileInfo, 0), - scanner: scanner, - events: make(chan loginp.FSEvent), - sameFileFunc: os.SameFile, + log: logp.NewLogger(watcherDebugKey), + cfg: config, + prev: make(map[string]loginp.FileDescriptor, 0), + scanner: scanner, + events: make(chan loginp.FSEvent), }, nil } @@ -128,7 +107,7 @@ func (w *fileWatcher) Run(ctx unison.Canceler) { // run initial scan before starting regular w.watch(ctx) - _ = timed.Periodic(ctx, w.interval, func() error { + _ = timed.Periodic(ctx, w.cfg.Interval, func() error { w.watch(ctx) return nil @@ -140,140 +119,192 @@ func (w *fileWatcher) watch(ctx unison.Canceler) { paths := w.scanner.GetFiles() - newFiles := make(map[string]os.FileInfo) + // for debugging purposes + writtenCount := 0 + truncatedCount := 0 + renamedCount := 0 + removedCount := 0 + createdCount := 0 - for path, info := range paths { + newFilesByName := make(map[string]*loginp.FileDescriptor) + newFilesByID := make(map[string]*loginp.FileDescriptor) + for path, fd := range paths { // if the scanner found a new path or an existing path // with a different file, it is a new file - prevInfo, ok := w.prev[path] - if !ok || !w.sameFileFunc(prevInfo, info) { - newFiles[path] = info + prevDesc, ok := w.prev[path] + sfd := fd // to avoid memory aliasing + if !ok || !loginp.SameFile(&prevDesc, &sfd) { + newFilesByName[path] = &sfd + newFilesByID[fd.FileID()] = &sfd continue } - // if the two infos belong to the same file and it has been modified - // if the size is smaller than before, it is truncated, if bigger, it is a write event. - // It might happen that a file is truncated and then more data is added, both - // within the same second, this will make the reader stop, but a new one will not - // start because the modification data is the same, to avoid this situation, - // we also check for size changes here. - if prevInfo.ModTime() != info.ModTime() || prevInfo.Size() != info.Size() { - if prevInfo.Size() > info.Size() || w.resendOnModTime && prevInfo.Size() == info.Size() { - select { - case <-ctx.Done(): - return - case w.events <- truncateEvent(path, info): - } - } else { - select { - case <-ctx.Done(): - return - case w.events <- writeEvent(path, info): - } + var e loginp.FSEvent + switch { + + // the new size is smaller, the file was truncated + case prevDesc.Info.Size() > fd.Info.Size(): + e = truncateEvent(path, fd) + truncatedCount++ + + // the size is the same, timestamps are different, the file was touched + case prevDesc.Info.Size() == fd.Info.Size() && prevDesc.Info.ModTime() != fd.Info.ModTime(): + if w.cfg.ResendOnModTime { + e = truncateEvent(path, fd) + truncatedCount++ } + + // the new size is larger, something was written + case prevDesc.Info.Size() < fd.Info.Size(): + e = writeEvent(path, fd) + writtenCount++ } - // delete from previous state, as we have more up to date info + // if none of the conditions were true, the file remained unchanged and we don't need to create an event + if e.Op != loginp.OpDone { + select { + case <-ctx.Done(): + return + case w.events <- e: + } + } + + // delete from previous state to mark that we've seen the existing file again delete(w.prev, path) } - // remaining files are in the prev map are the ones that are missing + // remaining files in the prev map are the ones that are missing // either because they have been deleted or renamed - for removedPath, removedInfo := range w.prev { - for newPath, newInfo := range newFiles { - if w.sameFileFunc(removedInfo, newInfo) { - select { - case <-ctx.Done(): - return - case w.events <- renamedEvent(removedPath, newPath, newInfo): - delete(newFiles, newPath) - goto CHECK_NEXT_REMOVED - } - } + for remainingPath, remainingDesc := range w.prev { + var e loginp.FSEvent + + id := remainingDesc.FileID() + if newDesc, renamed := newFilesByID[id]; renamed { + e = renamedEvent(remainingPath, newDesc.Filename, *newDesc) + delete(newFilesByName, newDesc.Filename) + delete(newFilesByID, id) + renamedCount++ + } else { + e = deleteEvent(remainingPath, remainingDesc) + removedCount++ } - select { case <-ctx.Done(): return - case w.events <- deleteEvent(removedPath, removedInfo): + case w.events <- e: } - CHECK_NEXT_REMOVED: } - // remaining files in newFiles are new - for path, info := range newFiles { + // remaining files in newFiles are newly created files + for path, fd := range newFilesByName { select { case <-ctx.Done(): return - case w.events <- createEvent(path, info): + case w.events <- createEvent(path, *fd): + createdCount++ } } - w.log.Debugf("Found %d paths", len(paths)) + w.log.With( + "total", len(paths), + "written", writtenCount, + "truncated", truncatedCount, + "renamed", renamedCount, + "removed", removedCount, + "created", createdCount, + ).Debugf("File scan complete") + w.prev = paths } -func createEvent(path string, fi os.FileInfo) loginp.FSEvent { - return loginp.FSEvent{Op: loginp.OpCreate, OldPath: "", NewPath: path, Info: fi} +func createEvent(path string, fd loginp.FileDescriptor) loginp.FSEvent { + return loginp.FSEvent{Op: loginp.OpCreate, OldPath: "", NewPath: path, Descriptor: fd} } -func writeEvent(path string, fi os.FileInfo) loginp.FSEvent { - return loginp.FSEvent{Op: loginp.OpWrite, OldPath: path, NewPath: path, Info: fi} +func writeEvent(path string, fd loginp.FileDescriptor) loginp.FSEvent { + return loginp.FSEvent{Op: loginp.OpWrite, OldPath: path, NewPath: path, Descriptor: fd} } -func truncateEvent(path string, fi os.FileInfo) loginp.FSEvent { - return loginp.FSEvent{Op: loginp.OpTruncate, OldPath: path, NewPath: path, Info: fi} +func truncateEvent(path string, fd loginp.FileDescriptor) loginp.FSEvent { + return loginp.FSEvent{Op: loginp.OpTruncate, OldPath: path, NewPath: path, Descriptor: fd} } -func renamedEvent(oldPath, path string, fi os.FileInfo) loginp.FSEvent { - return loginp.FSEvent{Op: loginp.OpRename, OldPath: oldPath, NewPath: path, Info: fi} +func renamedEvent(oldPath, path string, fd loginp.FileDescriptor) loginp.FSEvent { + return loginp.FSEvent{Op: loginp.OpRename, OldPath: oldPath, NewPath: path, Descriptor: fd} } -func deleteEvent(path string, fi os.FileInfo) loginp.FSEvent { - return loginp.FSEvent{Op: loginp.OpDelete, OldPath: path, NewPath: "", Info: fi} +func deleteEvent(path string, fd loginp.FileDescriptor) loginp.FSEvent { + return loginp.FSEvent{Op: loginp.OpDelete, OldPath: path, NewPath: "", Descriptor: fd} } func (w *fileWatcher) Event() loginp.FSEvent { return <-w.events } -func (w *fileWatcher) GetFiles() map[string]os.FileInfo { +func (w *fileWatcher) GetFiles() map[string]loginp.FileDescriptor { return w.scanner.GetFiles() } +type fingerprintConfig struct { + Enabled bool `config:"enabled"` + Offset int64 `config:"offset"` + Length int64 `config:"length"` +} + type fileScannerConfig struct { - ExcludedFiles []match.Matcher `config:"exclude_files"` - IncludedFiles []match.Matcher `config:"include_files"` - Symlinks bool `config:"symlinks"` - RecursiveGlob bool `config:"recursive_glob"` + ExcludedFiles []match.Matcher `config:"exclude_files"` + IncludedFiles []match.Matcher `config:"include_files"` + Symlinks bool `config:"symlinks"` + RecursiveGlob bool `config:"recursive_glob"` + Fingerprint fingerprintConfig `config:"fingerprint"` } func defaultFileScannerConfig() fileScannerConfig { return fileScannerConfig{ Symlinks: false, RecursiveGlob: true, + Fingerprint: fingerprintConfig{ + Enabled: false, + Offset: 0, + Length: DefaultFingerprintSize, + }, } } -func newFileScanner(paths []string, cfg fileScannerConfig) (loginp.FSScanner, error) { - fs := fileScanner{ - paths: paths, - excludedFiles: cfg.ExcludedFiles, - includedFiles: cfg.IncludedFiles, - symlinks: cfg.Symlinks, - log: logp.NewLogger(scannerName), +// fileScanner looks for files which match the patterns in paths. +// It is able to exclude files and symlinks. +type fileScanner struct { + paths []string + cfg fileScannerConfig + log *logp.Logger +} + +func newFileScanner(paths []string, config fileScannerConfig) (loginp.FSScanner, error) { + s := fileScanner{ + paths: paths, + cfg: config, + log: logp.NewLogger(scannerDebugKey), } - err := fs.resolveRecursiveGlobs(cfg) + + if s.cfg.Fingerprint.Enabled { + if s.cfg.Fingerprint.Length < sha256.BlockSize { + err := fmt.Errorf("fingerprint size %d bytes cannot be smaller than %d bytes", config.Fingerprint.Length, sha256.BlockSize) + return nil, fmt.Errorf("error while reading configuration of fingerprint: %w", err) + } + s.log.Debugf("fingerprint mode enabled: offset %d, length %d", s.cfg.Fingerprint.Offset, s.cfg.Fingerprint.Length) + } + + err := s.resolveRecursiveGlobs(config) if err != nil { return nil, err } - err = fs.normalizeGlobPatterns() + err = s.normalizeGlobPatterns() if err != nil { return nil, err } - return &fs, nil + return &s, nil } // resolveRecursiveGlobs expands `**` from the globs in multiple patterns @@ -313,11 +344,12 @@ func (s *fileScanner) normalizeGlobPatterns() error { return nil } -// GetFiles returns a map of files and fileinfos which +// GetFiles returns a map of file descriptors by filenames that // match the configured paths. -func (s *fileScanner) GetFiles() map[string]os.FileInfo { - pathInfo := map[string]os.FileInfo{} - uniqFileID := map[string]os.FileInfo{} +func (s *fileScanner) GetFiles() map[string]loginp.FileDescriptor { + fdByName := map[string]loginp.FileDescriptor{} + // used to determine if a symlink resolves in a already known target + uniqueIDs := map[string]string{} for _, path := range s.paths { matches, err := filepath.Glob(path) @@ -326,93 +358,137 @@ func (s *fileScanner) GetFiles() map[string]os.FileInfo { continue } - for _, file := range matches { - if s.shouldSkipFile(file) { + for _, filename := range matches { + it, err := s.getIngestTarget(filename) + if err != nil { + s.log.Debugf("cannot create an ingest target for file %q: %s", filename, err) continue } - // If symlink is enabled, it is checked that original is not part of same input - // If original is harvested by other input, states will potentially overwrite each other - if s.isOriginalAndSymlinkConfigured(file, uniqFileID) { + fd, err := s.toFileDescriptor(&it) + if err != nil { + s.log.Warnf("cannot create a file descriptor for an ingest target %q: %s", filename, err) continue } - fileInfo, err := os.Stat(file) - if err != nil { - s.log.Debug("stat(%s) failed: %s", file, err) + fileID := fd.FileID() + if knownFilename, exists := uniqueIDs[fileID]; exists { + s.log.Warnf("%q points to an already known ingest target %q [%s==%s]. Skipping", fd.Filename, knownFilename, fileID, fileID) continue } - pathInfo[file] = fileInfo + uniqueIDs[fileID] = fd.Filename + fdByName[filename] = fd } } - return pathInfo + return fdByName } -func (s *fileScanner) shouldSkipFile(file string) bool { - if s.isFileExcluded(file) || !s.isFileIncluded(file) { - s.log.Debugf("Exclude file: %s", file) - return true - } +type ingestTarget struct { + filename string + originalFilename string + symlink bool + info os.FileInfo +} - fileInfo, err := os.Lstat(file) - if err != nil { - s.log.Debugf("lstat(%s) failed: %s", file, err) - return true +func (s *fileScanner) getIngestTarget(filename string) (it ingestTarget, err error) { + if s.isFileExcluded(filename) { + return it, fmt.Errorf("file %q is excluded from ingestion", filename) } - if fileInfo.IsDir() { - s.log.Debugf("Skipping directory: %s", file) - return true + if !s.isFileIncluded(filename) { + return it, fmt.Errorf("file %q is not included in ingestion", filename) } - isSymlink := fileInfo.Mode()&os.ModeSymlink > 0 - if isSymlink && !s.symlinks { - s.log.Debugf("File %s skipped as it is a symlink", file) - return true - } + it.filename = filename + it.originalFilename = filename - originalFile, err := filepath.EvalSymlinks(file) + it.info, err = os.Lstat(it.filename) // to determine if it's a symlink if err != nil { - s.log.Debugf("finding path to original file has failed %s: %+v", file, err) - return true + return it, fmt.Errorf("failed to lstat %q: %w", it.filename, err) } - // Check if original file is included to make sure we are not reading from - // unwanted files. - if s.isFileExcluded(originalFile) || !s.isFileIncluded(originalFile) { - s.log.Debugf("Exclude original file: %s", file) - return true + + if it.info.IsDir() { + return it, fmt.Errorf("file %q is a directory", it.filename) } - return false + it.symlink = it.info.Mode()&os.ModeSymlink > 0 + + if it.symlink { + if !s.cfg.Symlinks { + return it, fmt.Errorf("file %q is a symlink and they're disabled", it.filename) + } + + // now we know it's a symlink, we stat with link resolution + it.info, err = os.Stat(it.filename) + if err != nil { + return it, fmt.Errorf("failed to stat the symlink %q: %w", it.filename, err) + } + + it.originalFilename, err = filepath.EvalSymlinks(it.filename) + if err != nil { + return it, fmt.Errorf("failed to resolve the symlink %q: %w", it.filename, err) + } + + if s.isFileExcluded(it.originalFilename) { + return it, fmt.Errorf("file %q->%q is excluded from ingestion", it.filename, it.originalFilename) + } + + if !s.isFileIncluded(it.originalFilename) { + return it, fmt.Errorf("file %q->%q is not included in ingestion", it.filename, it.originalFilename) + } + } + + return it, nil } -func (s *fileScanner) isOriginalAndSymlinkConfigured(file string, uniqFileID map[string]os.FileInfo) bool { - if s.symlinks { - fileInfo, err := os.Stat(file) +func (s *fileScanner) toFileDescriptor(it *ingestTarget) (fd loginp.FileDescriptor, err error) { + fd.Filename = it.filename + fd.Info = it.info + + if s.cfg.Fingerprint.Enabled { + fileSize := it.info.Size() + minSize := s.cfg.Fingerprint.Offset + s.cfg.Fingerprint.Length + if fileSize < minSize { + return fd, fmt.Errorf("filesize of %q is %d bytes, expected at least %d bytes for fingerprinting", fd.Filename, fileSize, minSize) + } + + h := sha256.New() + file, err := os.Open(it.originalFilename) if err != nil { - s.log.Debugf("stat(%s) failed: %s", file, err) - return false + return fd, fmt.Errorf("failed to open %q for fingerprinting: %w", it.originalFilename, err) } - fileID := file_helper.GetOSState(fileInfo).String() - if finfo, exists := uniqFileID[fileID]; exists { - s.log.Infof("Same file found as symlink and original. Skipping file: %s (as it same as %s)", file, finfo.Name()) - return true + defer file.Close() + + if s.cfg.Fingerprint.Offset != 0 { + _, err = file.Seek(s.cfg.Fingerprint.Offset, io.SeekStart) + if err != nil { + return fd, fmt.Errorf("failed to seek %q for fingerprinting: %w", fd.Filename, err) + } } - uniqFileID[fileID] = fileInfo + + r := io.LimitReader(file, s.cfg.Fingerprint.Length) + buf := make([]byte, h.BlockSize()) + written, err := io.CopyBuffer(h, r, buf) + if err != nil { + return fd, fmt.Errorf("failed to compute hash for first %d bytes of %q: %w", s.cfg.Fingerprint.Length, fd.Filename, err) + } + if written != s.cfg.Fingerprint.Length { + return fd, fmt.Errorf("failed to read %d bytes from %q to compute fingerprint, read only %d", written, fd.Filename, s.cfg.Fingerprint.Length) + } + + fd.Fingerprint = hex.EncodeToString(h.Sum(nil)) } - return false + + return fd, nil } func (s *fileScanner) isFileExcluded(file string) bool { - return len(s.excludedFiles) > 0 && s.matchAny(s.excludedFiles, file) + return len(s.cfg.ExcludedFiles) > 0 && s.matchAny(s.cfg.ExcludedFiles, file) } func (s *fileScanner) isFileIncluded(file string) bool { - if len(s.includedFiles) == 0 { - return true - } - return s.matchAny(s.includedFiles, file) + return len(s.cfg.IncludedFiles) == 0 || s.matchAny(s.cfg.IncludedFiles, file) } // matchAny checks if the text matches any of the regular expressions diff --git a/filebeat/input/filestream/fswatch_test.go b/filebeat/input/filestream/fswatch_test.go index da656a1ca0fc..90f2f594c4c0 100644 --- a/filebeat/input/filestream/fswatch_test.go +++ b/filebeat/input/filestream/fswatch_test.go @@ -19,301 +19,741 @@ package filestream import ( "context" - "io/ioutil" "os" "path/filepath" + "strings" "testing" "time" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" - "github.com/elastic/beats/v7/libbeat/common/match" - "github.com/elastic/elastic-agent-libs/logp" + conf "github.com/elastic/elastic-agent-libs/config" ) -var ( - excludedFileName = "excluded_file" - includedFileName = "included_file" - directoryPath = "unharvestable_dir" -) +func TestFileWatcher(t *testing.T) { + dir := t.TempDir() + paths := []string{filepath.Join(dir, "*.log")} + cfgStr := ` +scanner: + check_interval: 100ms + resend_on_touch: true + symlinks: false + recursive_glob: true + fingerprint: + enabled: false + offset: 0 + length: 1024 +` -func TestFileScanner(t *testing.T) { - tmpDir, err := ioutil.TempDir("", "fswatch_test_file_scanner") - if err != nil { - t.Fatalf("cannot create temporary test dir: %v", err) - } - defer os.RemoveAll(tmpDir) - setupFilesForScannerTest(t, tmpDir) - - excludedFilePath := filepath.Join(tmpDir, excludedFileName) - includedFilePath := filepath.Join(tmpDir, includedFileName) - - testCases := map[string]struct { - paths []string - excludedFiles []match.Matcher - includedFiles []match.Matcher - symlinks bool - expectedFiles []string - }{ - "select all files": { - paths: []string{excludedFilePath, includedFilePath}, - expectedFiles: []string{excludedFilePath, includedFilePath}, - }, - "skip excluded files": { - paths: []string{excludedFilePath, includedFilePath}, - excludedFiles: []match.Matcher{ - match.MustCompile(excludedFileName), - }, - expectedFiles: []string{includedFilePath}, - }, - "only include included_files": { - paths: []string{excludedFilePath, includedFilePath}, - includedFiles: []match.Matcher{ - match.MustCompile(includedFileName), - }, - expectedFiles: []string{includedFilePath}, - }, - "skip directories": { - paths: []string{filepath.Join(tmpDir, directoryPath)}, - expectedFiles: []string{}, - }, - } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() - for name, test := range testCases { - test := test - - t.Run(name, func(t *testing.T) { - cfg := fileScannerConfig{ - ExcludedFiles: test.excludedFiles, - IncludedFiles: test.includedFiles, - Symlinks: test.symlinks, - RecursiveGlob: false, - } - fs, err := newFileScanner(test.paths, cfg) - if err != nil { - t.Fatal(err) - } - files := fs.GetFiles() - paths := make([]string, 0) - for p := range files { - paths = append(paths, p) - } - assert.ElementsMatch(t, paths, test.expectedFiles) - }) - } -} + fw := createWatcherWithConfig(t, paths, cfgStr) -func setupFilesForScannerTest(t *testing.T, tmpDir string) { - err := os.Mkdir(filepath.Join(tmpDir, directoryPath), 0750) - if err != nil { - t.Fatalf("cannot create non harvestable directory: %v", err) - } - for _, path := range []string{excludedFileName, includedFileName} { - f, err := os.Create(filepath.Join(tmpDir, path)) - if err != nil { - t.Fatalf("file %s, error %v", path, err) + go fw.Run(ctx) + + t.Run("detects a new file", func(t *testing.T) { + basename := "created.log" + filename := filepath.Join(dir, basename) + err := os.WriteFile(filename, []byte("hello"), 0777) + require.NoError(t, err) + + e := fw.Event() + expEvent := loginp.FSEvent{ + NewPath: filename, + Op: loginp.OpCreate, + Descriptor: loginp.FileDescriptor{ + Filename: filename, + Info: testFileInfo{path: basename, size: 5}, // 5 bytes written + }, } + requireEqualEvents(t, expEvent, e) + }) + + t.Run("detects a file write", func(t *testing.T) { + basename := "created.log" + filename := filepath.Join(dir, basename) + + f, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY, 0777) + require.NoError(t, err) + _, err = f.WriteString("world") + require.NoError(t, err) f.Close() - } -} -func TestFileWatchNewDeleteModified(t *testing.T) { - oldTs := time.Now() - newTs := oldTs.Add(5 * time.Second) - testCases := map[string]struct { - prevFiles map[string]os.FileInfo - nextFiles map[string]os.FileInfo - expectedEvents []loginp.FSEvent - }{ - "one new file": { - prevFiles: map[string]os.FileInfo{}, - nextFiles: map[string]os.FileInfo{ - "new_path": testFileInfo{"new_path", 5, oldTs, nil}, - }, - expectedEvents: []loginp.FSEvent{ - {Op: loginp.OpCreate, OldPath: "", NewPath: "new_path", Info: testFileInfo{"new_path", 5, oldTs, nil}}, - }, - }, - "one deleted file": { - prevFiles: map[string]os.FileInfo{ - "old_path": testFileInfo{"old_path", 5, oldTs, nil}, - }, - nextFiles: map[string]os.FileInfo{}, - expectedEvents: []loginp.FSEvent{ - {Op: loginp.OpDelete, OldPath: "old_path", NewPath: "", Info: testFileInfo{"old_path", 5, oldTs, nil}}, + e := fw.Event() + expEvent := loginp.FSEvent{ + NewPath: filename, + OldPath: filename, + Op: loginp.OpWrite, + Descriptor: loginp.FileDescriptor{ + Filename: filename, + Info: testFileInfo{path: basename, size: 10}, // +5 bytes appended }, - }, - "one modified file": { - prevFiles: map[string]os.FileInfo{ - "path": testFileInfo{"path", 5, oldTs, nil}, - }, - nextFiles: map[string]os.FileInfo{ - "path": testFileInfo{"path", 10, newTs, nil}, - }, - expectedEvents: []loginp.FSEvent{ - {Op: loginp.OpWrite, OldPath: "path", NewPath: "path", Info: testFileInfo{"path", 10, newTs, nil}}, - }, - }, - "two modified files": { - prevFiles: map[string]os.FileInfo{ - "path1": testFileInfo{"path1", 5, oldTs, nil}, - "path2": testFileInfo{"path2", 5, oldTs, nil}, - }, - nextFiles: map[string]os.FileInfo{ - "path1": testFileInfo{"path1", 10, newTs, nil}, - "path2": testFileInfo{"path2", 10, newTs, nil}, - }, - expectedEvents: []loginp.FSEvent{ - {Op: loginp.OpWrite, OldPath: "path1", NewPath: "path1", Info: testFileInfo{"path1", 10, newTs, nil}}, - {Op: loginp.OpWrite, OldPath: "path2", NewPath: "path2", Info: testFileInfo{"path2", 10, newTs, nil}}, + } + requireEqualEvents(t, expEvent, e) + }) + + t.Run("detects a file rename", func(t *testing.T) { + basename := "created.log" + filename := filepath.Join(dir, basename) + newBasename := "renamed.log" + newFilename := filepath.Join(dir, newBasename) + + err := os.Rename(filename, newFilename) + require.NoError(t, err) + + e := fw.Event() + expEvent := loginp.FSEvent{ + NewPath: newFilename, + OldPath: filename, + Op: loginp.OpRename, + Descriptor: loginp.FileDescriptor{ + Filename: newFilename, + Info: testFileInfo{path: newBasename, size: 10}, }, - }, - "one modified file, one new file": { - prevFiles: map[string]os.FileInfo{ - "path1": testFileInfo{"path1", 5, oldTs, nil}, + } + requireEqualEvents(t, expEvent, e) + }) + + t.Run("detects a file truncate", func(t *testing.T) { + basename := "renamed.log" + filename := filepath.Join(dir, basename) + + err := os.Truncate(filename, 2) + require.NoError(t, err) + + e := fw.Event() + expEvent := loginp.FSEvent{ + NewPath: filename, + OldPath: filename, + Op: loginp.OpTruncate, + Descriptor: loginp.FileDescriptor{ + Filename: filename, + Info: testFileInfo{path: basename, size: 2}, }, - nextFiles: map[string]os.FileInfo{ - "path1": testFileInfo{"path1", 10, newTs, nil}, - "path2": testFileInfo{"path2", 10, newTs, nil}, + } + requireEqualEvents(t, expEvent, e) + }) + + t.Run("emits truncate on touch when resend_on_touch is enabled", func(t *testing.T) { + basename := "renamed.log" + filename := filepath.Join(dir, basename) + time := time.Now().Local().Add(time.Hour) + err := os.Chtimes(filename, time, time) + require.NoError(t, err) + + e := fw.Event() + expEvent := loginp.FSEvent{ + NewPath: filename, + OldPath: filename, + Op: loginp.OpTruncate, + Descriptor: loginp.FileDescriptor{ + Filename: filename, + Info: testFileInfo{path: basename, size: 2}, }, - expectedEvents: []loginp.FSEvent{ - {Op: loginp.OpWrite, OldPath: "path1", NewPath: "path1", Info: testFileInfo{"path1", 10, newTs, nil}}, - {Op: loginp.OpCreate, OldPath: "", NewPath: "path2", Info: testFileInfo{"path2", 10, newTs, nil}}, + } + requireEqualEvents(t, expEvent, e) + }) + + t.Run("detects a file remove", func(t *testing.T) { + basename := "renamed.log" + filename := filepath.Join(dir, basename) + + err := os.Remove(filename) + require.NoError(t, err) + + e := fw.Event() + expEvent := loginp.FSEvent{ + OldPath: filename, + Op: loginp.OpDelete, + Descriptor: loginp.FileDescriptor{ + Filename: filename, + Info: testFileInfo{path: basename, size: 2}, }, - }, - "one new file, one deleted file": { - prevFiles: map[string]os.FileInfo{ - "path_deleted": testFileInfo{"path_deleted", 5, oldTs, nil}, + } + requireEqualEvents(t, expEvent, e) + }) + + t.Run("propagates a fingerprints for a new file", func(t *testing.T) { + dir := t.TempDir() + paths := []string{filepath.Join(dir, "*.log")} + cfgStr := ` +scanner: + check_interval: 100ms + symlinks: false + recursive_glob: true + fingerprint: + enabled: true + offset: 0 + length: 1024 +` + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + fw := createWatcherWithConfig(t, paths, cfgStr) + go fw.Run(ctx) + + basename := "created.log" + filename := filepath.Join(dir, basename) + err := os.WriteFile(filename, []byte(strings.Repeat("a", 1024)), 0777) + require.NoError(t, err) + + e := fw.Event() + expEvent := loginp.FSEvent{ + NewPath: filename, + Op: loginp.OpCreate, + Descriptor: loginp.FileDescriptor{ + Filename: filename, + Fingerprint: "2edc986847e209b4016e141a6dc8716d3207350f416969382d431539bf292e4a", + Info: testFileInfo{path: basename, size: 1024}, }, - nextFiles: map[string]os.FileInfo{ - "path_new": testFileInfo{"path_new", 10, newTs, nil}, + } + requireEqualEvents(t, expEvent, e) + }) + + t.Run("does not emit events if a file is touched and resend_on_touch is disabled", func(t *testing.T) { + dir := t.TempDir() + paths := []string{filepath.Join(dir, "*.log")} + cfgStr := ` +scanner: + check_interval: 100ms +` + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + fw := createWatcherWithConfig(t, paths, cfgStr) + go fw.Run(ctx) + + basename := "created.log" + filename := filepath.Join(dir, basename) + err := os.WriteFile(filename, []byte(strings.Repeat("a", 1024)), 0777) + require.NoError(t, err) + + e := fw.Event() + expEvent := loginp.FSEvent{ + NewPath: filename, + Op: loginp.OpCreate, + Descriptor: loginp.FileDescriptor{ + Filename: filename, + Info: testFileInfo{path: basename, size: 1024}, }, - expectedEvents: []loginp.FSEvent{ - {Op: loginp.OpDelete, OldPath: "path_deleted", NewPath: "", Info: testFileInfo{"path_deleted", 5, oldTs, nil}}, - {Op: loginp.OpCreate, OldPath: "", NewPath: "path_new", Info: testFileInfo{"path_new", 10, newTs, nil}}, + } + requireEqualEvents(t, expEvent, e) + + time := time.Now().Local().Add(time.Hour) + err = os.Chtimes(filename, time, time) + require.NoError(t, err) + + e = fw.Event() + require.Equal(t, loginp.OpDone, e.Op) + }) + + t.Run("does not emit an event for a fingerprint collision", func(t *testing.T) { + dir := t.TempDir() + paths := []string{filepath.Join(dir, "*.log")} + cfgStr := ` +scanner: + check_interval: 100ms + fingerprint.enabled: true +` + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + fw := createWatcherWithConfig(t, paths, cfgStr) + go fw.Run(ctx) + + basename := "created.log" + filename := filepath.Join(dir, basename) + err := os.WriteFile(filename, []byte(strings.Repeat("a", 1024)), 0777) + require.NoError(t, err) + + e := fw.Event() + expEvent := loginp.FSEvent{ + NewPath: filename, + Op: loginp.OpCreate, + Descriptor: loginp.FileDescriptor{ + Filename: filename, + Fingerprint: "2edc986847e209b4016e141a6dc8716d3207350f416969382d431539bf292e4a", + Info: testFileInfo{path: basename, size: 1024}, }, - }, - } + } + requireEqualEvents(t, expEvent, e) - for name, test := range testCases { - test := test + // collisions are resolved in the alphabetical order, the first filename wins + basename = "created_collision.log" + filename = filepath.Join(dir, basename) + err = os.WriteFile(filename, []byte(strings.Repeat("a", 1024)), 0777) + require.NoError(t, err) - t.Run(name, func(t *testing.T) { - w := fileWatcher{ - log: logp.L(), - prev: test.prevFiles, - scanner: &mockScanner{test.nextFiles}, - events: make(chan loginp.FSEvent), - sameFileFunc: testSameFile, - } + e = fw.Event() + // means no event + require.Equal(t, loginp.OpDone, e.Op) + }) +} - go w.watch(context.Background()) +func TestFileScanner(t *testing.T) { + dir := t.TempDir() + dir2 := t.TempDir() // for symlink testing + paths := []string{filepath.Join(dir, "*.log")} - count := len(test.expectedEvents) - actual := make([]loginp.FSEvent, count) - for i := 0; i < count; i++ { - actual[i] = w.Event() - } + normalBasename := "normal.log" + undersizedBasename := "undersized.log" + excludedBasename := "excluded.log" + excludedIncludedBasename := "excluded_included.log" + travelerBasename := "traveler.log" + normalSymlinkBasename := "normal_symlink.log" + exclSymlinkBasename := "excl_symlink.log" + travelerSymlinkBasename := "portal.log" - assert.ElementsMatch(t, actual, test.expectedEvents) - }) + normalFilename := filepath.Join(dir, normalBasename) + undersizedFilename := filepath.Join(dir, undersizedBasename) + excludedFilename := filepath.Join(dir, excludedBasename) + excludedIncludedFilename := filepath.Join(dir, excludedIncludedBasename) + travelerFilename := filepath.Join(dir2, travelerBasename) + normalSymlinkFilename := filepath.Join(dir, normalSymlinkBasename) + exclSymlinkFilename := filepath.Join(dir, exclSymlinkBasename) + travelerSymlinkFilename := filepath.Join(dir, travelerSymlinkBasename) + + files := map[string]string{ + normalFilename: strings.Repeat("a", 1024), + undersizedFilename: strings.Repeat("a", 128), + excludedFilename: strings.Repeat("nothing to see here", 1024), + excludedIncludedFilename: strings.Repeat("perhaps something to see here", 1024), + travelerFilename: strings.Repeat("folks, I think I got lost", 1024), } -} -func TestFileWatcherTruncate(t *testing.T) { - oldTs := time.Now() - newTs := oldTs.Add(time.Second) - testCases := map[string]struct { - prevFiles map[string]os.FileInfo - nextFiles map[string]os.FileInfo - expectedEvents []loginp.FSEvent + sizes := make(map[string]int64, len(files)) + for filename, content := range files { + sizes[filename] = int64(len(content)) + } + for filename, content := range files { + err := os.WriteFile(filename, []byte(content), 0777) + require.NoError(t, err) + } + + // this is to test that a symlink for a known file does not add the file twice + err := os.Symlink(normalFilename, normalSymlinkFilename) + require.NoError(t, err) + + // this is to test that a symlink for an unknown file is added once + err = os.Symlink(travelerFilename, travelerSymlinkFilename) + require.NoError(t, err) + + // this is to test that a symlink to an excluded file is not added + err = os.Symlink(exclSymlinkFilename, exclSymlinkFilename) + require.NoError(t, err) + + // this is to test that directories are handled and excluded + err = os.Mkdir(filepath.Join(dir, "dir"), 0777) + require.NoError(t, err) + + cases := []struct { + name string + cfgStr string + expDesc map[string]loginp.FileDescriptor }{ - "truncated file, only size changes": { - prevFiles: map[string]os.FileInfo{ - "path": testFileInfo{"path", 42, oldTs, nil}, + { + name: "returns all files when no limits, not including the repeated symlink", + cfgStr: ` +scanner: + symlinks: true + recursive_glob: true + fingerprint: + enabled: false + offset: 0 + length: 1024 +`, + expDesc: map[string]loginp.FileDescriptor{ + normalFilename: { + Filename: normalFilename, + Info: testFileInfo{ + size: sizes[normalFilename], + path: normalBasename, + }, + }, + undersizedFilename: { + Filename: undersizedFilename, + Info: testFileInfo{ + size: sizes[undersizedFilename], + path: undersizedBasename, + }, + }, + excludedFilename: { + Filename: excludedFilename, + Info: testFileInfo{ + size: sizes[excludedFilename], + path: excludedBasename, + }, + }, + excludedIncludedFilename: { + Filename: excludedIncludedFilename, + Info: testFileInfo{ + size: sizes[excludedIncludedFilename], + path: excludedIncludedBasename, + }, + }, + travelerSymlinkFilename: { + Filename: travelerSymlinkFilename, + Info: testFileInfo{ + size: sizes[travelerFilename], + path: travelerSymlinkBasename, + }, + }, + }, + }, + { + name: "returns filtered files, excluding symlinks", + cfgStr: ` +scanner: + symlinks: false # symlinks are disabled + recursive_glob: false + fingerprint: + enabled: false + offset: 0 + length: 1024 +`, + expDesc: map[string]loginp.FileDescriptor{ + normalFilename: { + Filename: normalFilename, + Info: testFileInfo{ + size: sizes[normalFilename], + path: normalBasename, + }, + }, + undersizedFilename: { + Filename: undersizedFilename, + Info: testFileInfo{ + size: sizes[undersizedFilename], + path: undersizedBasename, + }, + }, + excludedFilename: { + Filename: excludedFilename, + Info: testFileInfo{ + size: sizes[excludedFilename], + path: excludedBasename, + }, + }, + excludedIncludedFilename: { + Filename: excludedIncludedFilename, + Info: testFileInfo{ + size: sizes[excludedIncludedFilename], + path: excludedIncludedBasename, + }, + }, }, - nextFiles: map[string]os.FileInfo{ - "path": testFileInfo{"path", 0, oldTs, nil}, + }, + { + name: "returns files according to excluded list", + cfgStr: ` +scanner: + exclude_files: ['.*exclude.*'] + symlinks: true + recursive_glob: true + fingerprint: + enabled: false + offset: 0 + length: 1024 +`, + expDesc: map[string]loginp.FileDescriptor{ + normalFilename: { + Filename: normalFilename, + Info: testFileInfo{ + size: sizes[normalFilename], + path: normalBasename, + }, + }, + undersizedFilename: { + Filename: undersizedFilename, + Info: testFileInfo{ + size: sizes[undersizedFilename], + path: undersizedBasename, + }, + }, + travelerSymlinkFilename: { + Filename: travelerSymlinkFilename, + Info: testFileInfo{ + size: sizes[travelerFilename], + path: travelerSymlinkBasename, + }, + }, }, - expectedEvents: []loginp.FSEvent{ - {Op: loginp.OpTruncate, OldPath: "path", NewPath: "path", Info: testFileInfo{"path", 0, oldTs, nil}}, + }, + { + name: "returns no symlink if the original file is excluded", + cfgStr: ` +scanner: + exclude_files: ['.*exclude.*', '.*traveler.*'] + symlinks: true +`, + expDesc: map[string]loginp.FileDescriptor{ + normalFilename: { + Filename: normalFilename, + Info: testFileInfo{ + size: sizes[normalFilename], + path: normalBasename, + }, + }, + undersizedFilename: { + Filename: undersizedFilename, + Info: testFileInfo{ + size: sizes[undersizedFilename], + path: undersizedBasename, + }, + }, }, }, - "truncated file, mod time and size changes": { - prevFiles: map[string]os.FileInfo{ - "path": testFileInfo{"path", 42, oldTs, nil}, + { + name: "returns files according to included list", + cfgStr: ` +scanner: + include_files: ['.*include.*'] + symlinks: true + recursive_glob: true + fingerprint: + enabled: false + offset: 0 + length: 1024 +`, + expDesc: map[string]loginp.FileDescriptor{ + excludedIncludedFilename: { + Filename: excludedIncludedFilename, + Info: testFileInfo{ + size: sizes[excludedIncludedFilename], + path: excludedIncludedBasename, + }, + }, }, - nextFiles: map[string]os.FileInfo{ - "path": testFileInfo{"path", 0, newTs, nil}, + }, + { + name: "returns no included symlink if the original file is not included", + cfgStr: ` +scanner: + include_files: ['.*include.*', '.*portal.*'] + symlinks: true +`, + expDesc: map[string]loginp.FileDescriptor{ + excludedIncludedFilename: { + Filename: excludedIncludedFilename, + Info: testFileInfo{ + size: sizes[excludedIncludedFilename], + path: excludedIncludedBasename, + }, + }, }, - expectedEvents: []loginp.FSEvent{ - {Op: loginp.OpTruncate, OldPath: "path", NewPath: "path", Info: testFileInfo{"path", 0, newTs, nil}}, + }, + { + name: "returns an included symlink if the original file is included", + cfgStr: ` +scanner: + include_files: ['.*include.*', '.*portal.*', '.*traveler.*'] + symlinks: true +`, + expDesc: map[string]loginp.FileDescriptor{ + excludedIncludedFilename: { + Filename: excludedIncludedFilename, + Info: testFileInfo{ + size: sizes[excludedIncludedFilename], + path: excludedIncludedBasename, + }, + }, + travelerSymlinkFilename: { + Filename: travelerSymlinkFilename, + Info: testFileInfo{ + size: sizes[travelerFilename], + path: travelerSymlinkBasename, + }, + }, }, }, - "no file change": { - prevFiles: map[string]os.FileInfo{ - "path": testFileInfo{"path", 42, oldTs, nil}, + { + name: "returns all files except too small to fingerprint", + cfgStr: ` +scanner: + symlinks: true + recursive_glob: true + fingerprint: + enabled: true + offset: 0 + length: 1024 +`, + expDesc: map[string]loginp.FileDescriptor{ + normalFilename: { + Filename: normalFilename, + Fingerprint: "2edc986847e209b4016e141a6dc8716d3207350f416969382d431539bf292e4a", + Info: testFileInfo{ + size: sizes[normalFilename], + path: normalBasename, + }, + }, + excludedFilename: { + Filename: excludedFilename, + Fingerprint: "bd151321c3bbdb44185414a1b56b5649a00206dd4792e7230db8904e43987336", + Info: testFileInfo{ + size: sizes[excludedFilename], + path: excludedBasename, + }, + }, + excludedIncludedFilename: { + Filename: excludedIncludedFilename, + Fingerprint: "bfdb99a65297062658c26dfcea816d76065df2a2da2594bfd9b96e9e405da1c2", + Info: testFileInfo{ + size: sizes[excludedIncludedFilename], + path: excludedIncludedBasename, + }, + }, + travelerSymlinkFilename: { + Filename: travelerSymlinkFilename, + Fingerprint: "c4058942bffcea08810a072d5966dfa5c06eb79b902bf0011890dd8d22e1a5f8", + Info: testFileInfo{ + size: sizes[travelerFilename], + path: travelerSymlinkBasename, + }, + }, }, - nextFiles: map[string]os.FileInfo{ - "path": testFileInfo{"path", 42, oldTs, nil}, + }, + { + name: "returns all files that match a non-standard fingerprint window", + cfgStr: ` +scanner: + symlinks: true + recursive_glob: true + fingerprint: + enabled: true + offset: 2 + length: 64 +`, + expDesc: map[string]loginp.FileDescriptor{ + normalFilename: { + Filename: normalFilename, + Fingerprint: "ffe054fe7ae0cb6dc65c3af9b61d5209f439851db43d0ba5997337df154668eb", + Info: testFileInfo{ + size: sizes[normalFilename], + path: normalBasename, + }, + }, + // undersizedFilename got excluded because of the matching fingerprint + excludedFilename: { + Filename: excludedFilename, + Fingerprint: "9c225a1e6a7df9c869499e923565b93937e88382bb9188145f117195cd41dcd1", + Info: testFileInfo{ + size: sizes[excludedFilename], + path: excludedBasename, + }, + }, + excludedIncludedFilename: { + Filename: excludedIncludedFilename, + Fingerprint: "7985b2b9750bdd3c76903db408aff3859204d6334279eaf516ecaeb618a218d5", + Info: testFileInfo{ + size: sizes[excludedIncludedFilename], + path: excludedIncludedBasename, + }, + }, + travelerSymlinkFilename: { + Filename: travelerSymlinkFilename, + Fingerprint: "da437600754a8eed6c194b7241b078679551c06c7dc89685a9a71be7829ad7e5", + Info: testFileInfo{ + size: sizes[travelerFilename], + path: travelerSymlinkBasename, + }, + }, }, - expectedEvents: []loginp.FSEvent{}, }, } - for name, test := range testCases { - t.Run(name, func(t *testing.T) { - w := fileWatcher{ - log: logp.L(), - prev: test.prevFiles, - scanner: &mockScanner{test.nextFiles}, - events: make(chan loginp.FSEvent, len(test.expectedEvents)), - sameFileFunc: testSameFile, - } - - w.watch(context.Background()) - close(w.events) - - actual := []loginp.FSEvent{} - for evt := range w.events { - actual = append(actual, evt) - } - - if len(actual) != len(test.expectedEvents) { - t.Fatalf("expecting %d elements, got %d", len(test.expectedEvents), len(actual)) - } - for i := range test.expectedEvents { - if test.expectedEvents[i] != actual[i] { - t.Errorf("element [%d] differ. Expecting:\n%#v\nGot:\n%#v\n", i, test.expectedEvents[i], actual[i]) - } - } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + s := createScannerWithConfig(t, paths, tc.cfgStr) + requireEqualFiles(t, tc.expDesc, s.GetFiles()) }) } + + t.Run("returns error when creating scanner with a fingerprint too small", func(t *testing.T) { + cfgStr := ` +scanner: + fingerprint: + enabled: true + offset: 0 + length: 1 +` + cfg, err := conf.NewConfigWithYAML([]byte(cfgStr), cfgStr) + require.NoError(t, err) + + ns := &conf.Namespace{} + err = ns.Unpack(cfg) + require.NoError(t, err) + + _, err = newFileWatcher(paths, ns) + require.Error(t, err) + require.Contains(t, err.Error(), "fingerprint size 1 bytes cannot be smaller than 64 bytes") + }) } -type mockScanner struct { - files map[string]os.FileInfo +func createWatcherWithConfig(t *testing.T, paths []string, cfgStr string) loginp.FSWatcher { + cfg, err := conf.NewConfigWithYAML([]byte(cfgStr), cfgStr) + require.NoError(t, err) + + ns := &conf.Namespace{} + err = ns.Unpack(cfg) + require.NoError(t, err) + + fw, err := newFileWatcher(paths, ns) + require.NoError(t, err) + + return fw } -func (m *mockScanner) GetFiles() map[string]os.FileInfo { - return m.files +func createScannerWithConfig(t *testing.T, paths []string, cfgStr string) loginp.FSScanner { + cfg, err := conf.NewConfigWithYAML([]byte(cfgStr), cfgStr) + require.NoError(t, err) + + ns := &conf.Namespace{} + err = ns.Unpack(cfg) + require.NoError(t, err) + + config := defaultFileWatcherConfig() + err = ns.Config().Unpack(&config) + require.NoError(t, err) + scanner, err := newFileScanner(paths, config.Scanner) + require.NoError(t, err) + + return scanner } -type testFileInfo struct { - path string - size int64 - time time.Time - sys interface{} +func requireEqualFiles(t *testing.T, expected, actual map[string]loginp.FileDescriptor) { + t.Helper() + require.Equalf(t, len(expected), len(actual), "amount of files does not match:\n\nexpected \n%v\n\n actual \n%v\n", filenames(expected), filenames(actual)) + + for expFilename, expFD := range expected { + actFD, exists := actual[expFilename] + require.Truef(t, exists, "the actual file list is missing expected filename %s", expFilename) + requireEqualDescriptors(t, expFD, actFD) + } +} + +func requireEqualEvents(t *testing.T, expected, actual loginp.FSEvent) { + t.Helper() + require.Equal(t, expected.NewPath, actual.NewPath, "NewPath") + require.Equal(t, expected.OldPath, actual.OldPath, "OldPath") + require.Equal(t, expected.Op, actual.Op, "Op") + requireEqualDescriptors(t, expected.Descriptor, actual.Descriptor) } -func (t testFileInfo) Name() string { return t.path } -func (t testFileInfo) Size() int64 { return t.size } -func (t testFileInfo) Mode() os.FileMode { return 0 } -func (t testFileInfo) ModTime() time.Time { return t.time } -func (t testFileInfo) IsDir() bool { return false } -func (t testFileInfo) Sys() interface{} { return t.sys } +func requireEqualDescriptors(t *testing.T, expected, actual loginp.FileDescriptor) { + t.Helper() + require.Equal(t, expected.Filename, actual.Filename, "Filename") + require.Equal(t, expected.Fingerprint, actual.Fingerprint, "Fingerprint") + require.Equal(t, expected.Info.Name(), actual.Info.Name(), "Info.Name()") + require.Equal(t, expected.Info.Size(), actual.Info.Size(), "Info.Size()") +} -func testSameFile(fi1, fi2 os.FileInfo) bool { - return fi1.Name() == fi2.Name() +func filenames(m map[string]loginp.FileDescriptor) (result string) { + for filename := range m { + result += filename + "\n" + } + return result } diff --git a/filebeat/input/filestream/identifier.go b/filebeat/input/filestream/identifier.go index 467ec64f23af..227efad11863 100644 --- a/filebeat/input/filestream/identifier.go +++ b/filebeat/input/filestream/identifier.go @@ -19,7 +19,6 @@ package filestream import ( "fmt" - "os" loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" "github.com/elastic/beats/v7/libbeat/common/file" @@ -36,6 +35,7 @@ const ( nativeName = "native" pathName = "path" inodeMarkerName = "inode_marker" + fingerprintName = "fingerprint" DefaultIdentifierName = nativeName identitySep = "::" @@ -45,6 +45,7 @@ var identifierFactories = map[string]identifierFactory{ nativeName: newINodeDeviceIdentifier, pathName: newPathIdentifier, inodeMarkerName: newINodeMarkerIdentifier, + fingerprintName: newFingerprintIdentifier, } type identifierFactory func(*conf.C) (fileIdentifier, error) @@ -58,19 +59,19 @@ type fileIdentifier interface { // fileSource implements the Source interface // It is required to identify and manage file sources. type fileSource struct { - info os.FileInfo + info loginp.FileDescriptor newPath string oldPath string truncated bool archived bool - name string + fileID string identifierGenerator string } // Name returns the registry identifier of the file. func (f fileSource) Name() string { - return f.name + return f.fileID } // newFileIdentifier creates a new state identifier for a log input. @@ -108,12 +109,12 @@ func newINodeDeviceIdentifier(_ *conf.C) (fileIdentifier, error) { func (i *inodeDeviceIdentifier) GetSource(e loginp.FSEvent) fileSource { return fileSource{ - info: e.Info, + info: e.Descriptor, newPath: e.NewPath, oldPath: e.OldPath, truncated: e.Op == loginp.OpTruncate, archived: e.Op == loginp.OpArchived, - name: i.name + identitySep + file.GetOSState(e.Info).String(), + fileID: i.name + identitySep + file.GetOSState(e.Descriptor.Info).String(), identifierGenerator: i.name, } } @@ -147,12 +148,12 @@ func (p *pathIdentifier) GetSource(e loginp.FSEvent) fileSource { path = e.OldPath } return fileSource{ - info: e.Info, + info: e.Descriptor, newPath: e.NewPath, oldPath: e.OldPath, truncated: e.Op == loginp.OpTruncate, archived: e.Op == loginp.OpArchived, - name: p.name + identitySep + path, + fileID: p.name + identitySep + path, identifierGenerator: p.name, } } @@ -179,7 +180,7 @@ func withSuffix(inner fileIdentifier, suffix string) fileIdentifier { func (s *suffixIdentifier) GetSource(e loginp.FSEvent) fileSource { fs := s.i.GetSource(e) - fs.name += "-" + s.suffix + fs.fileID += "-" + s.suffix return fs } diff --git a/filebeat/input/filestream/identifier_fingerprint.go b/filebeat/input/filestream/identifier_fingerprint.go new file mode 100644 index 000000000000..9451714778a9 --- /dev/null +++ b/filebeat/input/filestream/identifier_fingerprint.go @@ -0,0 +1,59 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filestream + +import ( + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" +) + +type fingerprintIdentifier struct { + log *logp.Logger +} + +func newFingerprintIdentifier(cfg *conf.C) (fileIdentifier, error) { + return &fingerprintIdentifier{ + log: logp.NewLogger("fingerprint_identifier"), + }, nil +} + +func (i *fingerprintIdentifier) GetSource(e loginp.FSEvent) fileSource { + return fileSource{ + info: e.Descriptor, + newPath: e.NewPath, + oldPath: e.OldPath, + truncated: e.Op == loginp.OpTruncate, + archived: e.Op == loginp.OpArchived, + fileID: fingerprintName + identitySep + e.Descriptor.Fingerprint, + identifierGenerator: fingerprintName, + } +} + +func (i *fingerprintIdentifier) Name() string { + return fingerprintName +} + +func (i *fingerprintIdentifier) Supports(f identifierFeature) bool { + switch f { + case trackRename: + return true + default: + } + return false +} diff --git a/filebeat/input/filestream/identifier_inode_deviceid.go b/filebeat/input/filestream/identifier_inode_deviceid.go index d5ef0aa6c659..6157c85aa876 100644 --- a/filebeat/input/filestream/identifier_inode_deviceid.go +++ b/filebeat/input/filestream/identifier_inode_deviceid.go @@ -93,14 +93,14 @@ func (i *inodeMarkerIdentifier) markerContents() string { } func (i *inodeMarkerIdentifier) GetSource(e loginp.FSEvent) fileSource { - osstate := file.GetOSState(e.Info) + osstate := file.GetOSState(e.Descriptor.Info) return fileSource{ - info: e.Info, + info: e.Descriptor, newPath: e.NewPath, oldPath: e.OldPath, truncated: e.Op == loginp.OpTruncate, archived: e.Op == loginp.OpArchived, - name: i.name + identitySep + osstate.InodeString() + "-" + i.markerContents(), + fileID: i.name + identitySep + osstate.InodeString() + "-" + i.markerContents(), identifierGenerator: i.name, } } diff --git a/filebeat/input/filestream/identifier_test.go b/filebeat/input/filestream/identifier_test.go index 4007161116fd..ca67ba375d60 100644 --- a/filebeat/input/filestream/identifier_test.go +++ b/filebeat/input/filestream/identifier_test.go @@ -52,8 +52,8 @@ func TestFileIdentifier(t *testing.T) { } src := identifier.GetSource(loginp.FSEvent{ - NewPath: tmpFile.Name(), - Info: fi, + NewPath: tmpFile.Name(), + Descriptor: loginp.FileDescriptor{Info: fi}, }) assert.Equal(t, identifier.Name()+"::"+file.GetOSState(fi).String(), src.Name()) @@ -76,8 +76,8 @@ func TestFileIdentifier(t *testing.T) { } src := identifier.GetSource(loginp.FSEvent{ - NewPath: tmpFile.Name(), - Info: fi, + NewPath: tmpFile.Name(), + Descriptor: loginp.FileDescriptor{Info: fi}, }) assert.Equal(t, identifier.Name()+"::"+file.GetOSState(fi).String()+"-my-suffix", src.Name()) @@ -129,4 +129,56 @@ func TestFileIdentifier(t *testing.T) { assert.Equal(t, test.expectedSrc, src.Name()) } }) + + t.Run("fingerprint identifier", func(t *testing.T) { + c := conf.MustNewConfigFrom(map[string]interface{}{ + "identifier": map[string]interface{}{ + "fingerprint": nil, + }, + }) + var cfg testFileIdentifierConfig + err := c.Unpack(&cfg) + require.NoError(t, err) + + identifier, err := newFileIdentifier(cfg.Identifier, "") + require.NoError(t, err) + assert.Equal(t, fingerprintName, identifier.Name()) + + testCases := []struct { + newPath string + oldPath string + operation loginp.Operation + desc loginp.FileDescriptor + expectedSrc string + }{ + { + newPath: "/path/to/file", + desc: loginp.FileDescriptor{Fingerprint: "fingerprintvalue"}, + expectedSrc: fingerprintName + "::fingerprintvalue", + }, + { + newPath: "/new/path/to/file", + oldPath: "/old/path/to/file", + operation: loginp.OpRename, + desc: loginp.FileDescriptor{Fingerprint: "fingerprintvalue"}, + expectedSrc: fingerprintName + "::fingerprintvalue", + }, + { + oldPath: "/old/path/to/file", + operation: loginp.OpDelete, + desc: loginp.FileDescriptor{Fingerprint: "fingerprintvalue"}, + expectedSrc: fingerprintName + "::fingerprintvalue", + }, + } + + for _, test := range testCases { + src := identifier.GetSource(loginp.FSEvent{ + NewPath: test.newPath, + OldPath: test.oldPath, + Op: test.operation, + Descriptor: test.desc, + }) + assert.Equal(t, test.expectedSrc, src.Name()) + } + }) } diff --git a/filebeat/input/filestream/internal/input-logfile/fswatch.go b/filebeat/input/filestream/internal/input-logfile/fswatch.go index 4f8fffe67414..dc00519c437b 100644 --- a/filebeat/input/filestream/internal/input-logfile/fswatch.go +++ b/filebeat/input/filestream/internal/input-logfile/fswatch.go @@ -21,6 +21,8 @@ import ( "os" "github.com/elastic/go-concert/unison" + + file_helper "github.com/elastic/beats/v7/libbeat/common/file" ) const ( @@ -54,6 +56,33 @@ func (o *Operation) String() string { return name } +// FileDescriptor represents full information about a file. +type FileDescriptor struct { + // Filename is an original filename this descriptor was created from. + // In case it was a symlink, this will be the filename of the symlink unlike + // the filename from the `Info`. + Filename string + // Info is the result of file stat + Info os.FileInfo + // Fingerprint is a computed hash of the file header + Fingerprint string +} + +// FileID returns a unique file ID +// If fingerprint is computed it's used as the ID. +// Otherwise, a combination of the device ID and inode is used. +func (fd FileDescriptor) FileID() string { + if fd.Fingerprint != "" { + return fd.Fingerprint + } + return file_helper.GetOSState(fd.Info).String() +} + +// SameFile returns true if descriptors point to the same file. +func SameFile(a, b *FileDescriptor) bool { + return a.FileID() == b.FileID() +} + // FSEvent returns inforamation about file system changes. type FSEvent struct { // NewPath is the new path of the file. @@ -63,16 +92,16 @@ type FSEvent struct { OldPath string // Op is the file system event: create, write, rename, remove Op Operation - // Info describes the file in the event. - Info os.FileInfo + // Descriptor describes the file in the event. + Descriptor FileDescriptor } // FSScanner retrieves a list of files from the file system. type FSScanner interface { // GetFiles returns the list of monitored files. // The keys of the map are the paths to the files and - // the values are the FileInfos describing the file. - GetFiles() map[string]os.FileInfo + // the values are the file descriptors that contain all necessary information about the file. + GetFiles() map[string]FileDescriptor } // FSWatcher returns file events of the monitored files. diff --git a/filebeat/input/filestream/logger.go b/filebeat/input/filestream/logger.go index 7963f11308dc..7b644fd0d877 100644 --- a/filebeat/input/filestream/logger.go +++ b/filebeat/input/filestream/logger.go @@ -28,8 +28,11 @@ func loggerWithEvent(logger *logp.Logger, event loginp.FSEvent, src loginp.Sourc "operation", event.Op.String(), "source_name", src.Name(), ) - if event.Info != nil && event.Info.Sys() != nil { - log = log.With("os_id", file.GetOSState(event.Info)) + if event.Descriptor.Fingerprint != "" { + log = log.With("fingerprint", event.Descriptor.Fingerprint) + } + if event.Descriptor.Info != nil && event.Descriptor.Info.Sys() != nil { + log = log.With("os_id", file.GetOSState(event.Descriptor.Info)) } if event.NewPath != "" { log = log.With("new_path", event.NewPath) diff --git a/filebeat/input/filestream/prospector.go b/filebeat/input/filestream/prospector.go index 1e4a9c91c7fa..336461fede57 100644 --- a/filebeat/input/filestream/prospector.go +++ b/filebeat/input/filestream/prospector.go @@ -78,12 +78,12 @@ func (p *fileProspector) Init( return "", nil } - fi, ok := files[fm.Source] + fd, ok := files[fm.Source] if !ok { return "", fm } - newKey := newID(p.identifier.GetSource(loginp.FSEvent{NewPath: fm.Source, Info: fi})) + newKey := newID(p.identifier.GetSource(loginp.FSEvent{NewPath: fm.Source, Descriptor: fd})) return newKey, fm }) @@ -109,13 +109,13 @@ func (p *fileProspector) Init( return "", nil } - fi, ok := files[fm.Source] + fd, ok := files[fm.Source] if !ok { return "", fm } if fm.IdentifierName != identifierName { - newKey := p.identifier.GetSource(loginp.FSEvent{NewPath: fm.Source, Info: fi}).Name() + newKey := p.identifier.GetSource(loginp.FSEvent{NewPath: fm.Source, Descriptor: fd}).Name() fm.IdentifierName = identifierName return newKey, fm } @@ -188,7 +188,7 @@ func (p *fileProspector) onFSEvent( } if p.isFileIgnored(log, event, ignoreSince) { - err := updater.ResetCursor(src, state{Offset: event.Info.Size()}) + err := updater.ResetCursor(src, state{Offset: event.Descriptor.Info.Size()}) if err != nil { log.Errorf("setting cursor for ignored file: %v", err) } @@ -224,12 +224,12 @@ func (p *fileProspector) onFSEvent( func (p *fileProspector) isFileIgnored(log *logp.Logger, fe loginp.FSEvent, ignoreInactiveSince time.Time) bool { if p.ignoreOlder > 0 { now := time.Now() - if now.Sub(fe.Info.ModTime()) > p.ignoreOlder { + if now.Sub(fe.Descriptor.Info.ModTime()) > p.ignoreOlder { log.Debugf("Ignore file because ignore_older reached. File %s", fe.NewPath) return true } } - if !ignoreInactiveSince.IsZero() && fe.Info.ModTime().Sub(ignoreInactiveSince) <= 0 { + if !ignoreInactiveSince.IsZero() && fe.Descriptor.Info.ModTime().Sub(ignoreInactiveSince) <= 0 { log.Debugf("Ignore file because ignore_since.* reached time %v. File %s", p.ignoreInactiveSince, fe.NewPath) return true } diff --git a/filebeat/input/filestream/prospector_creator.go b/filebeat/input/filestream/prospector_creator.go index 75a5e8dc3aad..5142704a614d 100644 --- a/filebeat/input/filestream/prospector_creator.go +++ b/filebeat/input/filestream/prospector_creator.go @@ -24,6 +24,8 @@ import ( loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" ) const ( @@ -36,16 +38,25 @@ const ( var experimentalWarning sync.Once func newProspector(config config) (loginp.Prospector, error) { + err := checkConfigCompatibility(config.FileWatcher, config.FileIdentity) + if err != nil { + return nil, err + } + filewatcher, err := newFileWatcher(config.Paths, config.FileWatcher) if err != nil { return nil, fmt.Errorf("error while creating filewatcher %w", err) } - identifier, err := newFileIdentifier(config.FileIdentity, getIdentifierSuffix(config)) + identifier, err := newFileIdentifier(config.FileIdentity, config.Reader.Parsers.Suffix) if err != nil { return nil, fmt.Errorf("error while creating file identifier: %w", err) } + logp.L(). + With("filestream_id", config.ID). + Debugf("file identity is set to %s", identifier.Name()) + fileprospector := fileProspector{ filewatcher: filewatcher, identifier: identifier, @@ -104,6 +115,22 @@ func newProspector(config config) (loginp.Prospector, error) { return nil, fmt.Errorf("no such rotation method: %s", rotationMethod) } -func getIdentifierSuffix(config config) string { - return config.Reader.Parsers.Suffix +func checkConfigCompatibility(fileWatcher, fileIdentifier *conf.Namespace) error { + var fwCfg struct { + Fingerprint struct { + Enabled bool `config:"enabled"` + } `config:"fingerprint"` + } + + if fileWatcher != nil && fileIdentifier != nil && fileIdentifier.Name() == fingerprintName { + err := fileWatcher.Config().Unpack(&fwCfg) + if err != nil { + return fmt.Errorf("failed to parse file watcher configuration: %w", err) + } + if !fwCfg.Fingerprint.Enabled { + return fmt.Errorf("fingerprint file identity can be used only when fingerprint is enabled in the scanner") + } + } + + return nil } diff --git a/filebeat/input/filestream/prospector_creator_test.go b/filebeat/input/filestream/prospector_creator_test.go index bb87cc7118d1..c49488ffd9cc 100644 --- a/filebeat/input/filestream/prospector_creator_test.go +++ b/filebeat/input/filestream/prospector_creator_test.go @@ -21,31 +21,95 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + conf "github.com/elastic/elastic-agent-libs/config" ) -func TestCreateProspector_SetIgnoreInactiveSince(t *testing.T) { - testCases := map[string]struct { - ignore_inactive_since string - }{ - "ignore_inactive_since set to since_last_start": { - ignore_inactive_since: "since_last_start", - }, - "ignore_inactive_since set to since_first_start": { - ignore_inactive_since: "since_first_start", - }, - "ignore_inactive_since not set": { - ignore_inactive_since: "", - }, - } - for name, test := range testCases { - test := test - t.Run(name, func(t *testing.T) { - c := config{ - IgnoreInactive: ignoreInactiveSettings[test.ignore_inactive_since], - } - p, _ := newProspector(c) - fileProspector := p.(*fileProspector) - assert.Equal(t, fileProspector.ignoreInactiveSince, ignoreInactiveSettings[test.ignore_inactive_since]) - }) - } +func TestCreateProspector(t *testing.T) { + t.Run("SetIgnoreInactiveSince", func(t *testing.T) { + testCases := map[string]struct { + ignore_inactive_since string + }{ + "ignore_inactive_since set to since_last_start": { + ignore_inactive_since: "since_last_start", + }, + "ignore_inactive_since set to since_first_start": { + ignore_inactive_since: "since_first_start", + }, + "ignore_inactive_since not set": { + ignore_inactive_since: "", + }, + } + for name, test := range testCases { + test := test + t.Run(name, func(t *testing.T) { + c := config{ + IgnoreInactive: ignoreInactiveSettings[test.ignore_inactive_since], + } + p, _ := newProspector(c) + fileProspector := p.(*fileProspector) + assert.Equal(t, fileProspector.ignoreInactiveSince, ignoreInactiveSettings[test.ignore_inactive_since]) + }) + } + }) + t.Run("file watcher and file identity compatibility", func(t *testing.T) { + cases := []struct { + name string + cfgStr string + err string + }{ + { + name: "returns no error for a fully default config", + cfgStr: ` +paths: ['some'] +`, + }, + { + name: "returns no error when fingerprint and identity is configured", + cfgStr: ` +paths: ['some'] +file_identity.fingerprint: ~ +prospector.scanner.fingerprint.enabled: true +`, + }, + { + name: "returns no error when fingerprint and other identity is configured", + cfgStr: ` +paths: ['some'] +file_identity.path: ~ +prospector.scanner.fingerprint.enabled: true +`, + }, + { + name: "returns error when fingerprint is disabled but fingerprint identity is configured", + cfgStr: ` +paths: ['some'] +file_identity.fingerprint: ~ +prospector.scanner.fingerprint.enabled: false +`, + err: "fingerprint file identity can be used only when fingerprint is enabled in the scanner", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + c, err := conf.NewConfigWithYAML([]byte(tc.cfgStr), tc.cfgStr) + require.NoError(t, err) + + cfg := defaultConfig() + err = c.Unpack(&cfg) + require.NoError(t, err) + + _, err = newProspector(cfg) + if tc.err == "" { + require.NoError(t, err) + return + } + + require.Error(t, err) + require.Contains(t, err.Error(), tc.err) + }) + } + }) } diff --git a/filebeat/input/filestream/prospector_test.go b/filebeat/input/filestream/prospector_test.go index c3860856e584..db30697ddbab 100644 --- a/filebeat/input/filestream/prospector_test.go +++ b/filebeat/input/filestream/prospector_test.go @@ -21,6 +21,7 @@ package filestream import ( "context" "fmt" + "io/fs" "io/ioutil" "os" "sync" @@ -39,7 +40,7 @@ import ( func TestProspector_InitCleanIfRemoved(t *testing.T) { testCases := map[string]struct { entries map[string]loginp.Value - filesOnDisk map[string]os.FileInfo + filesOnDisk map[string]loginp.FileDescriptor cleanRemoved bool expectedCleanedKeys []string }{ @@ -108,7 +109,7 @@ func TestProspector_InitUpdateIdentifiers(t *testing.T) { testCases := map[string]struct { entries map[string]loginp.Value - filesOnDisk map[string]os.FileInfo + filesOnDisk map[string]loginp.FileDescriptor expectedUpdatedKeys map[string]string }{ "prospector init does not update keys if there are no entries": { @@ -137,8 +138,8 @@ func TestProspector_InitUpdateIdentifiers(t *testing.T) { }, }, }, - filesOnDisk: map[string]os.FileInfo{ - tmpFileName: fi, + filesOnDisk: map[string]loginp.FileDescriptor{ + tmpFileName: {Info: fi}, }, expectedUpdatedKeys: map[string]string{"not_path::key1": "path::" + tmpFileName}, }, @@ -170,8 +171,8 @@ func TestProspectorNewAndUpdatedFiles(t *testing.T) { }{ "two new files": { events: []loginp.FSEvent{ - {Op: loginp.OpCreate, NewPath: "/path/to/file", Info: testFileInfo{}}, - {Op: loginp.OpCreate, NewPath: "/path/to/other/file", Info: testFileInfo{}}, + {Op: loginp.OpCreate, NewPath: "/path/to/file", Descriptor: createTestFileDescriptor()}, + {Op: loginp.OpCreate, NewPath: "/path/to/other/file", Descriptor: createTestFileDescriptor()}, }, expectedEvents: []harvesterEvent{ harvesterStart("path::/path/to/file"), @@ -181,7 +182,7 @@ func TestProspectorNewAndUpdatedFiles(t *testing.T) { }, "one updated file": { events: []loginp.FSEvent{ - {Op: loginp.OpWrite, NewPath: "/path/to/file", Info: testFileInfo{}}, + {Op: loginp.OpWrite, NewPath: "/path/to/file", Descriptor: createTestFileDescriptor()}, }, expectedEvents: []harvesterEvent{ harvesterStart("path::/path/to/file"), @@ -190,8 +191,8 @@ func TestProspectorNewAndUpdatedFiles(t *testing.T) { }, "one updated then truncated file": { events: []loginp.FSEvent{ - {Op: loginp.OpWrite, NewPath: "/path/to/file", Info: testFileInfo{}}, - {Op: loginp.OpTruncate, NewPath: "/path/to/file", Info: testFileInfo{}}, + {Op: loginp.OpWrite, NewPath: "/path/to/file", Descriptor: createTestFileDescriptor()}, + {Op: loginp.OpTruncate, NewPath: "/path/to/file", Descriptor: createTestFileDescriptor()}, }, expectedEvents: []harvesterEvent{ harvesterStart("path::/path/to/file"), @@ -202,14 +203,14 @@ func TestProspectorNewAndUpdatedFiles(t *testing.T) { "old files with ignore older configured": { events: []loginp.FSEvent{ { - Op: loginp.OpCreate, - NewPath: "/path/to/file", - Info: testFileInfo{"/path/to/file", 5, minuteAgo, nil}, + Op: loginp.OpCreate, + NewPath: "/path/to/file", + Descriptor: createTestFileDescriptorWithInfo(testFileInfo{"/path/to/file", 5, minuteAgo, nil}), }, { - Op: loginp.OpWrite, - NewPath: "/path/to/other/file", - Info: testFileInfo{"/path/to/other/file", 5, minuteAgo, nil}, + Op: loginp.OpWrite, + NewPath: "/path/to/other/file", + Descriptor: createTestFileDescriptorWithInfo(testFileInfo{"/path/to/other/file", 5, minuteAgo, nil}), }, }, ignoreOlder: 10 * time.Second, @@ -220,14 +221,14 @@ func TestProspectorNewAndUpdatedFiles(t *testing.T) { "newer files with ignore older": { events: []loginp.FSEvent{ { - Op: loginp.OpCreate, - NewPath: "/path/to/file", - Info: testFileInfo{"/path/to/file", 5, minuteAgo, nil}, + Op: loginp.OpCreate, + NewPath: "/path/to/file", + Descriptor: createTestFileDescriptorWithInfo(testFileInfo{"/path/to/file", 5, minuteAgo, nil}), }, { - Op: loginp.OpWrite, - NewPath: "/path/to/other/file", - Info: testFileInfo{"/path/to/other/file", 5, minuteAgo, nil}, + Op: loginp.OpWrite, + NewPath: "/path/to/other/file", + Descriptor: createTestFileDescriptorWithInfo(testFileInfo{"/path/to/other/file", 5, minuteAgo, nil}), }, }, ignoreOlder: 5 * time.Minute, @@ -265,14 +266,14 @@ func TestProspectorHarvesterUpdateIgnoredFiles(t *testing.T) { minuteAgo := time.Now().Add(-1 * time.Minute) eventCreate := loginp.FSEvent{ - Op: loginp.OpCreate, - NewPath: "/path/to/file", - Info: testFileInfo{"/path/to/file", 5, minuteAgo, nil}, + Op: loginp.OpCreate, + NewPath: "/path/to/file", + Descriptor: createTestFileDescriptorWithInfo(testFileInfo{"/path/to/file", 5, minuteAgo, nil}), } eventUpdated := loginp.FSEvent{ - Op: loginp.OpWrite, - NewPath: "/path/to/file", - Info: testFileInfo{"/path/to/file", 10, time.Now(), nil}, + Op: loginp.OpWrite, + NewPath: "/path/to/file", + Descriptor: createTestFileDescriptorWithInfo(testFileInfo{"/path/to/file", 10, time.Now(), nil}), } expectedEvents := []harvesterEvent{ harvesterStart("path::/path/to/file"), @@ -328,13 +329,13 @@ func TestProspectorDeletedFile(t *testing.T) { }{ "one deleted file without clean removed": { events: []loginp.FSEvent{ - {Op: loginp.OpDelete, OldPath: "/path/to/file", Info: testFileInfo{}}, + {Op: loginp.OpDelete, OldPath: "/path/to/file", Descriptor: createTestFileDescriptor()}, }, cleanRemoved: false, }, "one deleted file with clean removed": { events: []loginp.FSEvent{ - {Op: loginp.OpDelete, OldPath: "/path/to/file", Info: testFileInfo{}}, + {Op: loginp.OpDelete, OldPath: "/path/to/file", Descriptor: createTestFileDescriptor()}, }, cleanRemoved: true, }, @@ -377,10 +378,10 @@ func TestProspectorRenamedFile(t *testing.T) { "one renamed file without rename tracker": { events: []loginp.FSEvent{ { - Op: loginp.OpRename, - OldPath: "/old/path/to/file", - NewPath: "/new/path/to/file", - Info: testFileInfo{}, + Op: loginp.OpRename, + OldPath: "/old/path/to/file", + NewPath: "/new/path/to/file", + Descriptor: createTestFileDescriptor(), }, }, expectedEvents: []harvesterEvent{ @@ -392,10 +393,10 @@ func TestProspectorRenamedFile(t *testing.T) { "one renamed file with rename tracker": { events: []loginp.FSEvent{ { - Op: loginp.OpRename, - OldPath: "/old/path/to/file", - NewPath: "/new/path/to/file", - Info: testFileInfo{}, + Op: loginp.OpRename, + OldPath: "/old/path/to/file", + NewPath: "/new/path/to/file", + Descriptor: createTestFileDescriptor(), }, }, trackRename: true, @@ -406,10 +407,10 @@ func TestProspectorRenamedFile(t *testing.T) { "one renamed file with rename tracker with close renamed": { events: []loginp.FSEvent{ { - Op: loginp.OpRename, - OldPath: "/old/path/to/file", - NewPath: "/new/path/to/file", - Info: testFileInfo{}, + Op: loginp.OpRename, + OldPath: "/old/path/to/file", + NewPath: "/new/path/to/file", + Descriptor: createTestFileDescriptor(), }, }, trackRename: true, @@ -503,7 +504,7 @@ func (t *testHarvesterGroup) StopHarvesters() error { type mockFileWatcher struct { events []loginp.FSEvent - filesOnDisk map[string]os.FileInfo + filesOnDisk map[string]loginp.FileDescriptor outputCount, eventCount int @@ -523,7 +524,7 @@ func newMockFileWatcher(events []loginp.FSEvent, eventCount int) *mockFileWatche // newMockFileWatcherWithFiles creates an FSWatch mock to // get the required file information from the file system using // the GetFiles function. -func newMockFileWatcherWithFiles(filesOnDisk map[string]os.FileInfo) *mockFileWatcher { +func newMockFileWatcherWithFiles(filesOnDisk map[string]loginp.FileDescriptor) *mockFileWatcher { return &mockFileWatcher{ filesOnDisk: filesOnDisk, out: make(chan loginp.FSEvent), @@ -542,7 +543,7 @@ func (m *mockFileWatcher) Event() loginp.FSEvent { func (m *mockFileWatcher) Run(_ unison.Canceler) {} -func (m *mockFileWatcher) GetFiles() map[string]os.FileInfo { return m.filesOnDisk } +func (m *mockFileWatcher) GetFiles() map[string]loginp.FileDescriptor { return m.filesOnDisk } type mockMetadataUpdater struct { table map[string]interface{} @@ -668,10 +669,10 @@ func TestOnRenameFileIdentity(t *testing.T) { populateStore: true, events: []loginp.FSEvent{ { - Op: loginp.OpRename, - OldPath: "/old/path/to/file", - NewPath: "/new/path/to/file", - Info: testFileInfo{}, + Op: loginp.OpRename, + OldPath: "/old/path/to/file", + NewPath: "/new/path/to/file", + Descriptor: createTestFileDescriptor(), }, }, }, @@ -681,10 +682,10 @@ func TestOnRenameFileIdentity(t *testing.T) { populateStore: false, events: []loginp.FSEvent{ { - Op: loginp.OpRename, - OldPath: "/old/path/to/file", - NewPath: "/new/path/to/file", - Info: testFileInfo{}, + Op: loginp.OpRename, + OldPath: "/old/path/to/file", + NewPath: "/new/path/to/file", + Descriptor: createTestFileDescriptor(), }, }, }, @@ -721,3 +722,29 @@ func TestOnRenameFileIdentity(t *testing.T) { }) } } + +type testFileInfo struct { + path string + size int64 + time time.Time + sys interface{} +} + +func (t testFileInfo) Name() string { return t.path } +func (t testFileInfo) Size() int64 { return t.size } +func (t testFileInfo) Mode() os.FileMode { return 0 } +func (t testFileInfo) ModTime() time.Time { return t.time } +func (t testFileInfo) IsDir() bool { return false } +func (t testFileInfo) Sys() interface{} { return t.sys } + +func createTestFileDescriptor() loginp.FileDescriptor { + return createTestFileDescriptorWithInfo(testFileInfo{}) +} + +func createTestFileDescriptorWithInfo(fi fs.FileInfo) loginp.FileDescriptor { + return loginp.FileDescriptor{ + Info: fi, + Fingerprint: "fingerprint", + Filename: "filename", + } +} diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 9c9481ed37a5..5733bcd6ad7d 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -2892,6 +2892,19 @@ filebeat.inputs: # original for harvesting but will report the symlink name as the source. #prospector.scanner.symlinks: false + # If enabled, instead of relying on the device ID and inode values when comparing files, + # compare hashes of the given byte ranges in files. A file becomes an ingest target + # when its size grows larger than offset+length (see below). Until then it's ignored. + #prospector.scanner.fingerprint.enabled: false + + # If fingerprint mode is enabled, sets the offset from the beginning of the file + # for the byte range used for computing the fingerprint value. + #prospector.scanner.fingerprint.offset: 0 + + # If fingerprint mode is enabled, sets the length of the byte range used for + # computing the fingerprint value. Cannot be less than 64 bytes. + #prospector.scanner.fingerprint.length: 1024 + ### Parsers configuration #### JSON configuration