From 70ed5a3fc00e4f071d081e1b576eb28a575d9684 Mon Sep 17 00:00:00 2001 From: ruflin Date: Wed, 7 Sep 2016 09:10:27 +0200 Subject: [PATCH] Add experimental symlink support This implementation of symlinks takes a symlink and opens the original file. The only difference is that as Source the symlink path is reported. The advantage of this implementation is that everything related to file handling is identical with non symlink files as the original file is harvested. All close_* options work as expected. State information is stored for the original file, not the symlink itself. In case a symlink and original file are harvested, only one will be harvested as they share the same inode information. * Add test to verify that symlinks are disabled by default * Add test for symlink handling * Improve error handling in harvester. Return proper error messages instead of logging it. Prevents too many log messages. * Remove IsRegular file call as not needed anymore and leads to additional Stat calls * Add documentation See https://github.com/elastic/beats/issues/2277 and https://github.com/elastic/beats/pull/1767 --- CHANGELOG.asciidoc | 1 + .../configuration/filebeat-options.asciidoc | 12 + filebeat/etc/beat.full.yml | 4 + filebeat/filebeat.full.yml | 4 + filebeat/harvester/log.go | 17 +- filebeat/input/file/file.go | 25 -- filebeat/prospector/config.go | 2 + filebeat/prospector/prospector_log.go | 30 +- filebeat/tests/system/config/filebeat.yml.j2 | 1 + filebeat/tests/system/test_harvester.py | 271 ++++++++++++++++++ 10 files changed, 327 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 4a231243d41..962948eea40 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -86,6 +86,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d - Update to Go 1.7. {pull}2306[2306] - Log total non-zero internal metrics on shutdown. {pull}2349[2349] - Add support for encrypted private key files by introducing `ssl.key_passphrase` setting. {pull}2330[2330] +- Add experimental symlink support with `symlinks` config {pull}2478[2478] *Metricbeat* diff --git a/filebeat/docs/reference/configuration/filebeat-options.asciidoc b/filebeat/docs/reference/configuration/filebeat-options.asciidoc index 8521057d380..146172f88b6 100644 --- a/filebeat/docs/reference/configuration/filebeat-options.asciidoc +++ b/filebeat/docs/reference/configuration/filebeat-options.asciidoc @@ -391,6 +391,18 @@ This option applies to files that Filebeat has not already processed. If you ran NOTE: You can use this setting to avoid indexing old log lines when you run Filebeat on a set of log files for the first time. After the first run, we recommend disabling this option, or you risk losing lines during file rotation. +===== symlinks + +experimental[] + +The symlinks options allows to also harvest symlinks in addition to regular files. In case of symlinks, the file that is opened and read in the end is the original file even though the path of the symlink is reported. + +It must be made sure, that if a symlink is defined for harvesting, the original is excluded. In case the symlink and original file are harvested by the same prospector, this will be detected and only one of the two files will be harvested which is the first one found by filebeat. In case the symlink and original are defined in two different prospectors, it can happen that both will be harvested at the same time, send duplicated data and will overwrite each other state. + +This option can be useful if symlinks to the log files have additional meta data in the file name which can be processed in Logstash. This is for example the case for kubernetes log files. + +As this option may lead to data loss, it is disabled by default. + ===== backoff The backoff options specify how aggressively Filebeat crawls new files for updates. diff --git a/filebeat/etc/beat.full.yml b/filebeat/etc/beat.full.yml index 8af8d676853..b760baf2d26 100644 --- a/filebeat/etc/beat.full.yml +++ b/filebeat/etc/beat.full.yml @@ -143,6 +143,10 @@ filebeat.prospectors: # this can mean that the first entries of a new file are skipped. #tail_files: false + # Experimental: If symlinks is enabled, symlinks are opened and harvested. The harvester is openening the + # original for harvesting but will report the symlink name as source. + #symlinks: false + # Backoff values define how aggressively filebeat crawls new files for updates # The default values can be used in most cases. Backoff defines how long it is waited # to check a file again after EOF is reached. Default is 1s which means the file diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index 801bb77e29b..52bb9029c4d 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -143,6 +143,10 @@ filebeat.prospectors: # this can mean that the first entries of a new file are skipped. #tail_files: false + # Experimental: If symlinks is enabled, symlinks are opened and harvested. The harvester is openening the + # original for harvesting but will report the symlink name as source. + #symlinks: false + # Backoff values define how aggressively filebeat crawls new files for updates # The default values can be used in most cases. Backoff defines how long it is waited # to check a file again after EOF is reached. Default is 1s which means the file diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index c9d6b7abe70..19d7e8b5d0f 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -10,6 +10,8 @@ import ( "golang.org/x/text/transform" + "fmt" + "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/reader" "github.com/elastic/beats/filebeat/harvester/source" @@ -163,8 +165,7 @@ func (h *Harvester) openFile() error { f, err := file.ReadOpen(h.state.Source) if err != nil { - logp.Err("Failed opening %s: %s", h.state.Source, err) - return err + return fmt.Errorf("Failed opening %s: %s", h.state.Source, err) } harvesterOpenFiles.Add(1) @@ -182,16 +183,16 @@ func (h *Harvester) openFile() error { } func (h *Harvester) validateFile(f *os.File) error { - // Check we are not following a rabbit hole (symlinks, etc.) - if !file.IsRegular(f) { - return errors.New("Given file is not a regular file.") - } info, err := f.Stat() if err != nil { - logp.Err("Failed getting stats for file %s: %s", h.state.Source, err) - return err + return fmt.Errorf("Failed getting stats for file %s: %s", h.state.Source, err) } + + if !info.Mode().IsRegular() { + return fmt.Errorf("Tried to open non regular file: %q %s", info.Mode(), info.Name()) + } + // Compares the stat of the opened file to the state given by the prospector. Abort if not match. if !os.SameFile(h.state.Fileinfo, info) { return errors.New("File info is not identical with opened file. Aborting harvesting and retrying file later again.") diff --git a/filebeat/input/file/file.go b/filebeat/input/file/file.go index 85dabced895..3f01758ae41 100644 --- a/filebeat/input/file/file.go +++ b/filebeat/input/file/file.go @@ -13,26 +13,6 @@ type File struct { State *State } -// Check that the file isn't a symlink, mode is regular or file is nil -func (f *File) IsRegular() bool { - if f.File == nil { - logp.Critical("Harvester: BUG: f arg is nil") - return false - } - - info, e := f.File.Stat() - if e != nil { - logp.Err("File check fault: stat error: %s", e.Error()) - return false - } - - if !info.Mode().IsRegular() { - logp.Warn("Harvester: not a regular file: %q %s", info.Mode(), info.Name()) - return false - } - return true -} - // Checks if the two files are the same. func (f *File) IsSameFile(f2 *File) bool { return os.SameFile(f.FileInfo, f2.FileInfo) @@ -49,8 +29,3 @@ func IsSameFile(path string, info os.FileInfo) bool { return os.SameFile(fileInfo, info) } - -func IsRegular(file *os.File) bool { - f := &File{File: file} - return f.IsRegular() -} diff --git a/filebeat/prospector/config.go b/filebeat/prospector/config.go index ecd886dbcc0..a8a282a14b4 100644 --- a/filebeat/prospector/config.go +++ b/filebeat/prospector/config.go @@ -16,6 +16,7 @@ var ( CleanInactive: 0, CleanRemoved: false, HarvesterLimit: 0, + Symlinks: false, } ) @@ -28,6 +29,7 @@ type prospectorConfig struct { CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"` CleanRemoved bool `config:"clean_removed"` HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"` + Symlinks bool `config:"symlinks"` } func (config *prospectorConfig) Validate() error { diff --git a/filebeat/prospector/prospector_log.go b/filebeat/prospector/prospector_log.go index 6133f29134e..af749b0f233 100644 --- a/filebeat/prospector/prospector_log.go +++ b/filebeat/prospector/prospector_log.go @@ -99,6 +99,7 @@ func (p *ProspectorLog) getFiles() map[string]os.FileInfo { continue } + OUTER: // Check any matched files to see if we need to start a harvester for _, file := range matches { @@ -108,24 +109,39 @@ func (p *ProspectorLog) getFiles() map[string]os.FileInfo { continue } - fileinfo, err := os.Lstat(file) + // Fetch Lstat File info to detected also symlinks + fileInfo, err := os.Lstat(file) if err != nil { logp.Debug("prospector", "stat(%s) failed: %s", file, err) continue } - // Check if file is symlink - if fileinfo.Mode()&os.ModeSymlink != 0 { - logp.Debug("prospector", "File %s skipped as it is a symlink.", file) + if fileInfo.IsDir() { + logp.Debug("prospector", "Skipping directory: %s", file) continue } - if fileinfo.IsDir() { - logp.Debug("prospector", "Skipping directory: %s", file) + isSymlink := fileInfo.Mode()&os.ModeSymlink > 0 + if isSymlink && !p.config.Symlinks { + logp.Debug("prospector", "File %s skipped as it is a symlink.", file) continue } - paths[file] = fileinfo + // Fetch Stat file info which fetches the inode from the original and is used for comparison + fileInfo, err = os.Stat(file) + + // If symlink is enabled, it is checked that original is not part of same prospector + // It original is harvested by other prospector, states will potentially overwrite each other + if p.config.Symlinks { + for _, finfo := range paths { + if os.SameFile(finfo, fileInfo) { + logp.Info("Same file found as symlink and original. Skipping file: %s", file) + continue OUTER + } + } + } + + paths[file] = fileInfo } } diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 49572333508..625fb9e9fa4 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -27,6 +27,7 @@ filebeat.prospectors: clean_inactive: {{clean_inactive}} clean_removed: {{clean_removed}} harvester_limit: {{harvester_limit | default(0) }} + symlinks: {{symlinks}} {% if fields %} fields: diff --git a/filebeat/tests/system/test_harvester.py b/filebeat/tests/system/test_harvester.py index e0b5a6991bb..53b685a778e 100644 --- a/filebeat/tests/system/test_harvester.py +++ b/filebeat/tests/system/test_harvester.py @@ -484,3 +484,274 @@ def test_boms(self): assert output[0]["message"] == message filebeat.kill_and_wait() + + def test_ignore_symlink(self): + """ + Test that symlinks are ignored + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/symlink.log", + ) + + os.mkdir(self.working_dir + "/log/") + + logfile = self.working_dir + "/log/test.log" + symlink = self.working_dir + "/log/symlink.log" + + if os.name == "nt": + import win32file + win32file.CreateSymbolicLink(symlink, logfile, 0) + else: + os.symlink(logfile, symlink) + + with open(logfile, 'a') as file: + file.write("Hello World\n") + + filebeat = self.start_beat() + + # Make sure symlink is skipped + self.wait_until( + lambda: self.log_contains( + "skipped as it is a symlink"), + max_timeout=15) + + filebeat.check_kill_and_wait() + + + def test_symlinks_enabled(self): + """ + Test if symlinks are harvested + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/symlink.log", + symlinks="true", + ) + + os.mkdir(self.working_dir + "/log/") + + logfile = self.working_dir + "/log/test.log" + symlink = self.working_dir + "/log/symlink.log" + + if os.name == "nt": + import win32file + win32file.CreateSymbolicLink(symlink, logfile, 0) + else: + os.symlink(logfile, symlink) + + with open(logfile, 'a') as file: + file.write("Hello World\n") + + filebeat = self.start_beat() + + # Make sure content in symlink file is read + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=10) + + filebeat.check_kill_and_wait() + + + def test_symlink_rotated(self): + """ + Test what happens if symlink removed and points to a new file + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/symlink.log", + symlinks="true", + ) + + os.mkdir(self.working_dir + "/log/") + + logfile1 = self.working_dir + "/log/test1.log" + logfile2 = self.working_dir + "/log/test2.log" + symlink = self.working_dir + "/log/symlink.log" + + if os.name == "nt": + import win32file + win32file.CreateSymbolicLink(symlink, logfile1, 0) + else: + os.symlink(logfile1, symlink) + + with open(logfile1, 'a') as file: + file.write("Hello World1\n") + + with open(logfile2, 'a') as file: + file.write("Hello World2\n") + + filebeat = self.start_beat() + + + # Make sure symlink is skipped + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=10) + + os.remove(symlink) + + if os.name == "nt": + import win32file + win32file.CreateSymbolicLink(symlink, logfile2, 0) + else: + os.symlink(logfile2, symlink) + + with open(logfile1, 'a') as file: + file.write("Hello World3\n") + file.write("Hello World4\n") + + # Make sure new file and addition to old file were read + self.wait_until( + lambda: self.output_has(lines=4), + max_timeout=10) + + filebeat.check_kill_and_wait() + + # Check if two different files are in registry + data = self.get_registry() + assert len(data) == 2 + + + def test_symlink_removed(self): + """ + Tests that if a symlink to a file is removed, no further data is read which is added to the original file + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/symlink.log", + symlinks="true", + ) + + os.mkdir(self.working_dir + "/log/") + + logfile = self.working_dir + "/log/test.log" + symlink = self.working_dir + "/log/symlink.log" + + if os.name == "nt": + import win32file + win32file.CreateSymbolicLink(symlink, logfile, 0) + else: + os.symlink(logfile, symlink) + + with open(logfile, 'a') as file: + file.write("Hello World1\n") + + filebeat = self.start_beat() + + # Make sure symlink is skipped + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=10) + + os.remove(symlink) + + with open(logfile, 'a') as file: + file.write("Hello World2n") + + # Sleep 1s to make sure new events are not picked up + time.sleep(1) + + # Make sure also new file was read + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=10) + + filebeat.check_kill_and_wait() + + # Check if two different files are in registry + data = self.get_registry() + assert len(data) == 1 + + def test_symlink_and_file(self): + """ + Tests that if symlink and original file are read, that only events from one are added + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + symlinks="true", + ) + + os.mkdir(self.working_dir + "/log/") + + logfile = self.working_dir + "/log/test.log" + symlink = self.working_dir + "/log/symlink.log" + + if os.name == "nt": + import win32file + win32file.CreateSymbolicLink(symlink, logfile, 0) + else: + os.symlink(logfile, symlink) + + with open(logfile, 'a') as file: + file.write("Hello World1\n") + + filebeat = self.start_beat() + + # Make sure both files were read + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=10) + + filebeat.check_kill_and_wait() + + # Check if two different files are in registry + data = self.get_registry() + assert len(data) == 1 + + def test_truncate(self): + """ + Tests what happens if file is truncated and symlink recreated + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + symlinks="true", + ) + + os.mkdir(self.working_dir + "/log/") + + logfile = self.working_dir + "/log/test.log" + symlink = self.working_dir + "/log/symlink.log" + + if os.name == "nt": + import win32file + win32file.CreateSymbolicLink(symlink, logfile, 0) + else: + os.symlink(logfile, symlink) + + with open(logfile, 'w') as file: + file.write("Hello World1\n") + file.write("Hello World2\n") + file.write("Hello World3\n") + file.write("Hello World4\n") + + filebeat = self.start_beat() + + # Make sure both files were read + self.wait_until( + lambda: self.output_has(lines=4), + max_timeout=10) + + os.remove(symlink) + with open(logfile, 'w') as file: + file.truncate() + file.seek(0) + + if os.name == "nt": + import win32file + win32file.CreateSymbolicLink(symlink, logfile, 0) + else: + os.symlink(logfile, symlink) + + # Write new file with content shorter then old one + with open(logfile, 'a') as file: + file.write("Hello World5\n") + file.write("Hello World6\n") + file.write("Hello World7\n") + + # Make sure both files were read + self.wait_until( + lambda: self.output_has(lines=7), + max_timeout=10) + + filebeat.check_kill_and_wait() + + # Check that only 1 registry entry as original was only truncated + data = self.get_registry() + assert len(data) == 1