From 14a22200398661f455e731b4f6d3ed8f2a2b92bb Mon Sep 17 00:00:00 2001 From: ruflin Date: Mon, 27 Jun 2016 14:08:04 +0200 Subject: [PATCH] Introducing ttl and timestamp in state Each state has a timestamp and a ttl. -1 means ttl is disable, 0 means it should be directly removed. This moves the logic on what should happen with a state completely to the state itself and makes it possible to use states in different use cases. The advantage of the ttl is that it does not depend in filebeat on the modification time which can be incorrect. This means, in case a file is rotated, the timestamp of a file is updated and it also counts as a new state. This makes sense as otherwise it could happen that the state of a rotate file is removed and then after rotation the file is picked up again as a completely new file. The downside is that people using filebeat must understand the difference between the state timestamp and modtime. In general timestamp is neweer then the modtime, as filebeat finishes reading later. On the registrar side, the cleanup happens every time before the registry is written. On the prospector side the state is cleaned up after each scan. It can happen that the prospector state list and registrar state list are not 100% in sync as they don't cleanup the states at the same time. The prospector state is the one that will always overwrite the registrar state. No cleanup is done before shutdown. It is important, that on startup first a full scan is done to update the states before the state is cleaned up, otherwise still needed states could be removed. This is part of https://github.com/elastic/beats/issues/1600 Additional: * Fixed offset change for prospector to start harvester for old files. Note: * Nice part of this is that registrar does not have to now about expiry and removal of files, state is communication channel. --- filebeat/etc/beat.full.yml | 6 ++ filebeat/filebeat.full.yml | 4 + filebeat/input/file/state.go | 27 ++++--- filebeat/prospector/config.go | 11 +-- filebeat/prospector/prospector.go | 4 + filebeat/prospector/prospector_log.go | 22 ++++-- filebeat/registrar/registrar.go | 11 ++- filebeat/tests/system/config/filebeat.yml.j2 | 2 + filebeat/tests/system/filebeat.py | 2 +- filebeat/tests/system/test_registrar.py | 79 +++++++++++++++++++- 10 files changed, 135 insertions(+), 33 deletions(-) diff --git a/filebeat/etc/beat.full.yml b/filebeat/etc/beat.full.yml index 1b77cc8c4575..dffa3a746a12 100644 --- a/filebeat/etc/beat.full.yml +++ b/filebeat/etc/beat.full.yml @@ -179,8 +179,14 @@ filebeat.prospectors: # Closes the file handler as soon as the harvesters reaches then end of the file. # The file will be picked up again by the harvester at previous known state # after scan_frequency in case the file can still be discovered by the prospector. + # Note: Potential data loss if file is deleted / moved before picked up again after + # scan_frequency by prospector #close_eof: false + # Files for the modification data is older then clean_older the state from the registry is removed + # By default this is disabled. + #clean_older: 0 + #----------------------------- Stdin prospector ------------------------------- # Configuration to use stdin input #- input_type: stdin diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index 1c1725e57f15..6773eebec378 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -181,6 +181,10 @@ filebeat.prospectors: # after scan_frequency in case the file can still be discovered by the prospector. #close_eof: false + # Files for the modification data is older then clean_older the state from the registry is removed + # By default this is disabled. + #clean_older: 0 + #----------------------------- Stdin prospector ------------------------------- # Configuration to use stdin input #- input_type: stdin diff --git a/filebeat/input/file/state.go b/filebeat/input/file/state.go index 2c1ba908bc45..96b7c585ff09 100644 --- a/filebeat/input/file/state.go +++ b/filebeat/input/file/state.go @@ -15,7 +15,8 @@ type State struct { Finished bool `json:"-"` // harvester state Fileinfo os.FileInfo `json:"-"` // the file info FileStateOS StateOS - LastSeen time.Time `json:"last_seen"` + Timestamp time.Time `json:"timestamp"` + TTL time.Duration `json:"ttl"` } // NewState creates a new file state @@ -25,7 +26,8 @@ func NewState(fileInfo os.FileInfo, path string) State { Source: path, Finished: false, FileStateOS: GetOSState(fileInfo), - LastSeen: time.Now(), + Timestamp: time.Now(), + TTL: -1 * time.Second, // By default, state does have an infinit ttl } } @@ -47,7 +49,7 @@ func (s *States) Update(newState State) { defer s.mutex.Unlock() index, _ := s.findPrevious(newState) - newState.LastSeen = time.Now() + newState.Timestamp = time.Now() if index >= 0 { s.states[index] = newState @@ -81,25 +83,30 @@ func (s *States) findPrevious(newState State) (int, State) { } // Cleanup cleans up the state array. All states which are older then `older` are removed -func (s *States) Cleanup(older time.Duration) { +func (s *States) Cleanup() { + s.mutex.Lock() defer s.mutex.Unlock() - for i, state := range s.states { + currentTime := time.Now() + states := s.states[:0] - // File wasn't seen for longer then older -> remove state - if time.Since(state.LastSeen) > older { - logp.Debug("prospector", "State removed for %s because of older: %s", state.Source) - s.states = append(s.states[:i], s.states[i+1:]...) + for _, state := range s.states { + ttl := state.TTL + if ttl >= 0 && currentTime.Sub(state.Timestamp) > ttl { + logp.Debug("state", "State removed for %v because of older: %v", state.Source, ttl) + continue // drop state } + states = append(states, state) // in-place copy old state } - + s.states = states } // Count returns number of states func (s *States) Count() int { s.mutex.Lock() defer s.mutex.Unlock() + return len(s.states) } diff --git a/filebeat/prospector/config.go b/filebeat/prospector/config.go index 85f201b7ac05..ff2f7dd8b774 100644 --- a/filebeat/prospector/config.go +++ b/filebeat/prospector/config.go @@ -8,16 +8,12 @@ import ( cfg "github.com/elastic/beats/filebeat/config" ) -const ( - DefaultIgnoreOlder time.Duration = 0 - DefaultScanFrequency time.Duration = 10 * time.Second -) - var ( defaultConfig = prospectorConfig{ - IgnoreOlder: DefaultIgnoreOlder, - ScanFrequency: DefaultScanFrequency, + IgnoreOlder: 0, + ScanFrequency: 10 * time.Second, InputType: cfg.DefaultInputType, + CleanOlder: 0, } ) @@ -27,6 +23,7 @@ type prospectorConfig struct { Paths []string `config:"paths"` ScanFrequency time.Duration `config:"scan_frequency"` InputType string `config:"input_type"` + CleanOlder time.Duration `config:"clean_older" validate:"min=0"` } func (config *prospectorConfig) Validate() error { diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index 287f378c0ef5..73d1844d9968 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -105,6 +105,10 @@ func (p *Prospector) Run() { logp.Info("Prospector channel stopped") return case event := <-p.harvesterChan: + // Add ttl if cleanOlder is enabled + if p.config.CleanOlder > 0 { + event.FileState.TTL = p.config.CleanOlder + } select { case <-p.done: logp.Info("Prospector channel stopped") diff --git a/filebeat/prospector/prospector_log.go b/filebeat/prospector/prospector_log.go index 02f27f4fe717..d9cab7e5afe7 100644 --- a/filebeat/prospector/prospector_log.go +++ b/filebeat/prospector/prospector_log.go @@ -12,8 +12,9 @@ import ( type ProspectorLog struct { Prospector *Prospector - lastscan time.Time config prospectorConfig + lastScan time.Time + lastClean time.Time } func NewProspectorLog(p *Prospector) (*ProspectorLog, error) { @@ -40,6 +41,7 @@ func (p *ProspectorLog) Init() { // Overwrite prospector states p.Prospector.states.SetStates(fileStates) + p.lastClean = time.Now() logp.Info("Previous states loaded: %v", p.Prospector.states.Count()) } @@ -48,10 +50,13 @@ func (p *ProspectorLog) Run() { logp.Debug("prospector", "Start next scan") p.scan() - // Only cleanup states if enabled - if p.config.IgnoreOlder != 0 { - p.Prospector.states.Cleanup(p.config.IgnoreOlder) + + // It is important that a first scan is run before cleanup to make sure all new states are read first + if p.config.CleanOlder > 0 { + p.Prospector.states.Cleanup() + logp.Debug("prospector", "Prospector states cleaned up.") } + p.lastScan = time.Now() } // getFiles returns all files which have to be harvested @@ -104,7 +109,7 @@ func (p *ProspectorLog) getFiles() map[string]os.FileInfo { // Scan starts a scanGlob for each provided path/glob func (p *ProspectorLog) scan() { - newlastscan := time.Now() + newLastScan := time.Now() // TODO: Track harvesters to prevent any file from being harvested twice. Finished state could be delayed? // Now let's do one quick scan to pick up new files @@ -126,7 +131,7 @@ func (p *ProspectorLog) scan() { } } - p.lastscan = newlastscan + p.lastScan = newLastScan } // harvestNewFile harvest a new file @@ -145,10 +150,12 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S logp.Debug("prospector", "Update existing file for harvesting: %s, offset: %v", newState.Source, oldState.Offset) + // TODO: check for ignore_older reached? or should that happen in scan already? + // No harvester is running for the file, start a new harvester // It is important here that only the size is checked and not modification time, as modification time could be incorrect on windows // https://blogs.technet.microsoft.com/asiasupp/2010/12/14/file-date-modified-property-are-not-updating-while-modifying-a-file-without-closing-it/ - if oldState.Finished && newState.Fileinfo.Size() > newState.Offset { + if oldState.Finished && newState.Fileinfo.Size() > oldState.Offset { // Resume harvesting of an old file we've stopped harvesting from // This could also be an issue with force_close_older that a new harvester is started after each scan but not needed? // One problem with comparing modTime is that it is in seconds, and scans can happen more then once a second @@ -166,6 +173,7 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S // Update state because of file rotation h.SendStateUpdate() } else { + // TODO: improve logging depedent on what the exact reason is that harvesting does not continue // Nothing to do. Harvester is still running and file was not renamed logp.Debug("prospector", "No updates needed, file %s is already harvested.", newState.Source) } diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index 4d0d3c34eca2..ebd9274fdb33 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -132,8 +132,8 @@ func (r *Registrar) loadAndConvertOldState(f *os.File) bool { logp.Info("Old registry states found: %v", len(oldStates)) counter := 0 for _, state := range oldStates { - // Makes time last_seen time of migration, as this is the best guess - state.LastSeen = time.Now() + // Makes timestamp time of migration, as this is the best guess + state.Timestamp = time.Now() states[counter] = state counter++ } @@ -180,8 +180,10 @@ func (r *Registrar) Run() { r.processEventStates(events) } - if e := r.writeRegistry(); e != nil { - logp.Err("Writing of registry returned error: %v. Continuing..", e) + r.states.Cleanup() + logp.Debug("registrar", "Registrar states cleaned up.") + if err := r.writeRegistry(); err != nil { + logp.Err("Writing of registry returned error: %v. Continuing...", err) } } } @@ -219,6 +221,7 @@ func (r *Registrar) writeRegistry() error { return err } + // First clean up states states := r.states.GetStates() encoder := json.NewEncoder(f) diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 5b6af299cf60..c8b6b05209f2 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -22,6 +22,8 @@ filebeat.prospectors: close_removed: {{close_removed}} close_renamed: {{close_renamed}} close_eof: {{close_eof}} + force_close_files: {{force_close_files}} + clean_older: {{clean_older}} {% if fields %} fields: diff --git a/filebeat/tests/system/filebeat.py b/filebeat/tests/system/filebeat.py index cb6372a53bf5..c6db2731d439 100644 --- a/filebeat/tests/system/filebeat.py +++ b/filebeat/tests/system/filebeat.py @@ -39,7 +39,7 @@ def get_registry_entry_by_path(self, path): if tmp_entry == None: tmp_entry = entry else: - if tmp_entry["last_seen"] < entry["last_seen"]: + if tmp_entry["timestamp"] < entry["timestamp"]: tmp_entry = entry return tmp_entry diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index 95f90fed2982..cf808ad29f5a 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -636,13 +636,15 @@ def test_migration_non_windows(self): # Compare first entry oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}}') newJson = self.get_registry_entry_by_path("logs/hello.log") - del newJson["last_seen"] + del newJson["timestamp"] + del newJson["ttl"] assert newJson == oldJson # Compare second entry oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}') newJson = self.get_registry_entry_by_path("logs/log2.log") - del newJson["last_seen"] + del newJson["timestamp"] + del newJson["ttl"] assert newJson == oldJson # Make sure the right number of entries is in @@ -686,15 +688,84 @@ def test_migration_windows(self): # Compare first entry oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}}') newJson = self.get_registry_entry_by_path("logs/hello.log") - del newJson["last_seen"] + del newJson["timestamp"] + del newJson["ttl"] assert newJson == oldJson # Compare second entry oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}') newJson = self.get_registry_entry_by_path("logs/log2.log") - del newJson["last_seen"] + del newJson["timestamp"] + del newJson["ttl"] assert newJson == oldJson # Make sure the right number of entries is in data = self.get_registry() assert len(data) == 2 + + + def test_clean_older(self): + """ + Checks that states are properly removed after clean_older + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/input*", + clean_older="4s", + ignoreOlder="2s", + closeOlder="0.2s", + scan_frequency="0.1s" + ) + + + os.mkdir(self.working_dir + "/log/") + testfile1 = self.working_dir + "/log/input1" + testfile2 = self.working_dir + "/log/input2" + testfile3 = self.working_dir + "/log/input3" + + with open(testfile1, 'w') as f: + f.write("first file\n") + + with open(testfile2, 'w') as f: + f.write("second file\n") + + filebeat = self.start_beat() + + self.wait_until( + lambda: self.output_has(lines=2), + max_timeout=10) + + data = self.get_registry() + assert len(data) == 2 + + # Wait until states are removed from prospectors + self.wait_until( + lambda: self.log_contains_count( + "State removed for") == 2, + max_timeout=15) + + with open(testfile3, 'w') as f: + f.write("2\n") + + # Write new file to make sure registrar is flushed again + self.wait_until( + lambda: self.output_has(lines=3), + max_timeout=30) + + # Wait until states are removed from prospectors + self.wait_until( + lambda: self.log_contains_count( + "State removed for") == 4, + max_timeout=15) + + filebeat.check_kill_and_wait() + + # Check that the first to files were removed from the registry + data = self.get_registry() + assert len(data) == 1 + + # Make sure the last file in the registry is the correct one and has the correct offset + if os.name == "nt": + assert data[0]["offset"] == 3 + else: + assert data[0]["offset"] == 2 +