diff --git a/filebeat/harvester/harvester.go b/filebeat/harvester/harvester.go index 40d8ff2ec47c..3b2097d12787 100644 --- a/filebeat/harvester/harvester.go +++ b/filebeat/harvester/harvester.go @@ -16,7 +16,6 @@ package harvester import ( "fmt" "regexp" - "sync" "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/encoding" @@ -27,12 +26,11 @@ import ( ) type Harvester struct { - Path string /* the file path to harvest */ - Config harvesterConfig + path string /* the file path to harvest */ + config harvesterConfig offset int64 - State file.State - stateMutex sync.Mutex - SpoolerChan chan *input.FileEvent + state file.State + prospectorChan chan *input.FileEvent encoding encoding.EncodingFactory file source.FileSource /* the file being watched */ ExcludeLinesRegexp []*regexp.Regexp @@ -44,42 +42,42 @@ func NewHarvester( cfg *common.Config, path string, state file.State, - spooler chan *input.FileEvent, + prospectorChan chan *input.FileEvent, offset int64, done chan struct{}, ) (*Harvester, error) { h := &Harvester{ - Path: path, - Config: defaultConfig, - State: state, - SpoolerChan: spooler, - offset: offset, - done: done, + path: path, + config: defaultConfig, + state: state, + prospectorChan: prospectorChan, + offset: offset, + done: done, } - if err := cfg.Unpack(&h.Config); err != nil { + if err := cfg.Unpack(&h.config); err != nil { return nil, err } - if err := h.Config.Validate(); err != nil { + if err := h.config.Validate(); err != nil { return nil, err } - encoding, ok := encoding.FindEncoding(h.Config.Encoding) + encoding, ok := encoding.FindEncoding(h.config.Encoding) if !ok || encoding == nil { - return nil, fmt.Errorf("unknown encoding('%v')", h.Config.Encoding) + return nil, fmt.Errorf("unknown encoding('%v')", h.config.Encoding) } h.encoding = encoding - h.ExcludeLinesRegexp = h.Config.ExcludeLines - h.IncludeLinesRegexp = h.Config.IncludeLines + h.ExcludeLinesRegexp = h.config.ExcludeLines + h.IncludeLinesRegexp = h.config.IncludeLines return h, nil } // open does open the file given under h.Path and assigns the file handler to h.file func (h *Harvester) open() (encoding.Encoding, error) { - switch h.Config.InputType { + switch h.config.InputType { case config.StdinInputType: return h.openStdin() case config.LogInputType: diff --git a/filebeat/harvester/harvester_test.go b/filebeat/harvester/harvester_test.go index 51a7ac2582a1..0b32e0408dc2 100644 --- a/filebeat/harvester/harvester_test.go +++ b/filebeat/harvester/harvester_test.go @@ -14,9 +14,9 @@ import ( func TestExampleTest(t *testing.T) { h := Harvester{ - Path: "/var/log/", + path: "/var/log/", offset: 0, } - assert.Equal(t, "/var/log/", h.Path) + assert.Equal(t, "/var/log/", h.path) } diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index dba9addba06c..0a3213419e65 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -24,7 +24,7 @@ func (h *Harvester) Harvest() { // Makes sure file is properly closed when the harvester is stopped defer h.close() - h.State.Finished = false + h.state.Finished = false enc, err := h.open() if err != nil { @@ -32,12 +32,12 @@ func (h *Harvester) Harvest() { return } - logp.Info("Harvester started for file: %s", h.Path) + logp.Info("Harvester started for file: %s", h.path) // TODO: NewLineReader uses additional buffering to deal with encoding and testing // for new lines in input stream. Simple 8-bit based encodings, or plain // don't require 'complicated' logic. - cfg := h.Config + cfg := h.config readerConfig := reader.LogFileReaderConfig{ CloseRemoved: cfg.CloseRemoved, CloseRenamed: cfg.CloseRenamed, @@ -57,7 +57,7 @@ func (h *Harvester) Harvest() { } // Always report the state before starting a harvester - if !h.SendStateUpdate() { + if !h.sendStateUpdate() { return } @@ -74,14 +74,14 @@ func (h *Harvester) Harvest() { if err != nil { switch err { case reader.ErrFileTruncate: - logp.Info("File was truncated. Begin reading file from offset 0: %s", h.Path) + logp.Info("File was truncated. Begin reading file from offset 0: %s", h.path) h.SetOffset(0) case reader.ErrRemoved: - logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.Path) + logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.path) case reader.ErrRenamed: - logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.Path) + logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.path) case io.EOF: - logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.Path) + logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.path) default: logp.Info("Read line error: %s", err) } @@ -94,7 +94,6 @@ func (h *Harvester) Harvest() { event := h.createEvent() if h.shouldExportLine(text) { - event.ReadTime = ts event.Bytes = bytesRead event.Text = &text @@ -113,19 +112,17 @@ func (h *Harvester) Harvest() { // of a harvester func (h *Harvester) createEvent() *input.FileEvent { event := &input.FileEvent{ - EventMetadata: h.Config.EventMetadata, - Source: h.Path, - InputType: h.Config.InputType, - DocumentType: h.Config.DocumentType, + EventMetadata: h.config.EventMetadata, + Source: h.path, + InputType: h.config.InputType, + DocumentType: h.config.DocumentType, Offset: h.getOffset(), Bytes: 0, - Fileinfo: h.State.Fileinfo, - JSONConfig: h.Config.JSON, + Fileinfo: h.state.Fileinfo, + JSONConfig: h.config.JSON, + State: h.getState(), } - if h.Config.InputType != config.StdinInputType { - event.FileState = h.GetState() - } return event } @@ -135,7 +132,7 @@ func (h *Harvester) sendEvent(event *input.FileEvent) bool { select { case <-h.done: return false - case h.SpoolerChan <- event: // ship the new event downstream + case h.prospectorChan <- event: // ship the new event downstream return true } } @@ -169,9 +166,9 @@ func (h *Harvester) shouldExportLine(line string) bool { func (h *Harvester) openFile() (encoding.Encoding, error) { var encoding encoding.Encoding - f, err := file.ReadOpen(h.Path) + f, err := file.ReadOpen(h.path) if err != nil { - logp.Err("Failed opening %s: %s", h.Path, err) + logp.Err("Failed opening %s: %s", h.path, err) return nil, err } @@ -182,11 +179,11 @@ func (h *Harvester) openFile() (encoding.Encoding, error) { info, err := f.Stat() if err != nil { - logp.Err("Failed getting stats for file %s: %s", h.Path, err) + logp.Err("Failed getting stats for file %s: %s", h.path, err) return nil, err } // Compares the stat of the opened file to the state given by the prospector. Abort if not match. - if !os.SameFile(h.State.Fileinfo, info) { + if !os.SameFile(h.state.Fileinfo, info) { return nil, errors.New("File info is not identical with opened file. Aborting harvesting and retrying file later again.") } @@ -219,20 +216,20 @@ func (h *Harvester) initFileOffset(file *os.File) error { // continue from last known offset logp.Debug("harvester", - "harvest: %q position:%d (offset snapshot:%d)", h.Path, h.getOffset(), offset) + "harvest: %q position:%d (offset snapshot:%d)", h.path, h.getOffset(), offset) _, err = file.Seek(h.getOffset(), os.SEEK_SET) - } else if h.Config.TailFiles { + } else if h.config.TailFiles { // tail file if file is new and tail_files config is set logp.Debug("harvester", - "harvest: (tailing) %q (offset snapshot:%d)", h.Path, offset) + "harvest: (tailing) %q (offset snapshot:%d)", h.path, offset) offset, err = file.Seek(0, os.SEEK_END) h.SetOffset(offset) } else { // get offset from file in case of encoding factory was // required to read some data. - logp.Debug("harvester", "harvest: %q (offset snapshot:%d)", h.Path, offset) + logp.Debug("harvester", "harvest: %q (offset snapshot:%d)", h.path, offset) h.SetOffset(offset) } @@ -251,43 +248,45 @@ func (h *Harvester) updateOffset(increment int64) { h.offset += increment } -// SendStateUpdate send an empty event with the current state to update the registry -func (h *Harvester) SendStateUpdate() bool { - logp.Debug("harvester", "Update state: %s, offset: %v", h.Path, h.offset) +// sendStateUpdate send an empty event with the current state to update the registry +func (h *Harvester) sendStateUpdate() bool { + logp.Debug("harvester", "Update state: %s, offset: %v", h.path, h.offset) return h.sendEvent(h.createEvent()) } -func (h *Harvester) GetState() file.State { - h.stateMutex.Lock() - defer h.stateMutex.Unlock() +func (h *Harvester) getState() file.State { + + if h.config.InputType == config.StdinInputType { + return file.State{} + } h.refreshState() - return h.State + return h.state } // refreshState refreshes the values in State with the values from the harvester itself func (h *Harvester) refreshState() { - h.State.Source = h.Path - h.State.Offset = h.getOffset() - h.State.FileStateOS = file.GetOSState(h.State.Fileinfo) + h.state.Source = h.path + h.state.Offset = h.getOffset() + h.state.FileStateOS = file.GetOSState(h.state.Fileinfo) } func (h *Harvester) close() { // Mark harvester as finished - h.State.Finished = true + h.state.Finished = true // On completion, push offset so we can continue where we left off if we relaunch on the same file - h.SendStateUpdate() + h.sendStateUpdate() - logp.Debug("harvester", "Stopping harvester for file: %s", h.Path) + logp.Debug("harvester", "Stopping harvester for file: %s", h.path) // Make sure file is closed as soon as harvester exits // If file was never opened, it can't be closed if h.file != nil { h.file.Close() - logp.Debug("harvester", "Stopping harvester, closing file: %s", h.Path) + logp.Debug("harvester", "Stopping harvester, closing file: %s", h.path) } else { - logp.Warn("Stopping harvester, NOT closing file as file info not available: %s", h.Path) + logp.Warn("Stopping harvester, NOT closing file as file info not available: %s", h.path) } } diff --git a/filebeat/input/event.go b/filebeat/input/event.go index afada9f2478f..07aa6cbc46ef 100644 --- a/filebeat/input/event.go +++ b/filebeat/input/event.go @@ -25,7 +25,13 @@ type FileEvent struct { Fileinfo os.FileInfo JSONFields common.MapStr JSONConfig *processor.JSONConfig - FileState file.State + State file.State +} + +func NewEvent(state file.State) *FileEvent { + return &FileEvent{ + State: state, + } } func (f *FileEvent) ToMapStr() common.MapStr { diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index 73d1844d9968..aafb0f531e8b 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -107,14 +107,14 @@ func (p *Prospector) Run() { case event := <-p.harvesterChan: // Add ttl if cleanOlder is enabled if p.config.CleanOlder > 0 { - event.FileState.TTL = p.config.CleanOlder + event.State.TTL = p.config.CleanOlder } select { case <-p.done: logp.Info("Prospector channel stopped") return case p.spoolerChan <- event: - p.states.Update(event.FileState) + p.states.Update(event.State) } } } diff --git a/filebeat/prospector/prospector_log.go b/filebeat/prospector/prospector_log.go index 0910e873ef69..d6ddae5dfbff 100644 --- a/filebeat/prospector/prospector_log.go +++ b/filebeat/prospector/prospector_log.go @@ -6,6 +6,7 @@ import ( "time" "github.com/elastic/beats/filebeat/harvester" + "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/libbeat/logp" ) @@ -13,7 +14,6 @@ import ( type ProspectorLog struct { Prospector *Prospector config prospectorConfig - lastScan time.Time lastClean time.Time } @@ -65,8 +65,8 @@ func (p *ProspectorLog) Run() { _, err := os.Stat(state.Source) if err != nil { state.TTL = 0 - h, _ := p.Prospector.createHarvester(state) - h.SendStateUpdate() + event := input.NewEvent(state) + p.Prospector.harvesterChan <- event logp.Debug("prospector", "Cleanup state for file as file removed: %s", state.Source) } } @@ -123,8 +123,6 @@ func (p *ProspectorLog) getFiles() map[string]os.FileInfo { // Scan starts a scanGlob for each provided path/glob func (p *ProspectorLog) scan() { - newLastScan := time.Now() - // TODO: Track harvesters to prevent any file from being harvested twice. Finished state could be delayed? // Now let's do one quick scan to pick up new files for f, fileinfo := range p.getFiles() { @@ -144,9 +142,6 @@ func (p *ProspectorLog) scan() { p.harvestExistingFile(newState, lastState) } } - - // Only update lastScan timestamp after scan is completed - p.lastScan = newLastScan } // harvestNewFile harvest a new file @@ -182,11 +177,11 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S // or no new lines were detected. It sends only an event status update to make sure the new name is persisted. logp.Debug("prospector", "File rename was detected, updating state: %s -> %s, Current offset: %v", oldState.Source, newState.Source, oldState.Offset) - h, _ := p.Prospector.createHarvester(newState) - h.SetOffset(oldState.Offset) - // Update state because of file rotation - h.SendStateUpdate() + newState.Offset = oldState.Offset + event := input.NewEvent(newState) + p.Prospector.harvesterChan <- event + } else { // TODO: improve logging depedent on what the exact reason is that harvesting does not continue // Nothing to do. Harvester is still running and file was not renamed diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index ebd9274fdb33..727c05fc1275 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -199,7 +199,7 @@ func (r *Registrar) processEventStates(events []*FileEvent) { if event.InputType == cfg.StdinInputType { continue } - r.states.Update(event.FileState) + r.states.Update(event.State) } }