From fe8b9dcf7a5f1e2a214c6208d3a39f08834c2a23 Mon Sep 17 00:00:00 2001 From: ruflin Date: Mon, 25 Apr 2016 14:24:26 +0200 Subject: [PATCH] Try getting race conditions under control --- filebeat/crawler/prospector.go | 4 ++-- filebeat/harvester/harvester.go | 4 ++-- filebeat/harvester/log.go | 39 +++++++++++++++++++++++++-------- 3 files changed, 34 insertions(+), 13 deletions(-) diff --git a/filebeat/crawler/prospector.go b/filebeat/crawler/prospector.go index 482c106b656..015283fed16 100644 --- a/filebeat/crawler/prospector.go +++ b/filebeat/crawler/prospector.go @@ -107,11 +107,11 @@ func (p *Prospector) Stop() { logp.Info("Stopping Prospector") close(p.done) - logp.Debug("prospector", "Stopping %d harvesters.", len(p.harvesters)) + //logp.Debug("prospector", "Stopping %d harvesters.", len(p.harvesters)) for _, h := range p.harvesters { go h.Stop() } - logp.Debug("prospector", "Waiting for %d harvesters to stop", len(p.harvesters)) + //logp.Debug("prospector", "Waiting for %d harvesters to stop", len(p.harvesters)) p.harvestersWaitGroup.Wait() } diff --git a/filebeat/harvester/harvester.go b/filebeat/harvester/harvester.go index 8779a180cd6..995049993c5 100644 --- a/filebeat/harvester/harvester.go +++ b/filebeat/harvester/harvester.go @@ -24,7 +24,6 @@ import ( "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/encoding" "github.com/elastic/beats/filebeat/input" - "github.com/elastic/beats/libbeat/logp" ) type Harvester struct { @@ -38,6 +37,7 @@ type Harvester struct { SpoolerChan chan *input.FileEvent encoding encoding.EncodingFactory file FileSource /* the file being watched */ + fileLock sync.Mutex ExcludeLinesRegexp []*regexp.Regexp IncludeLinesRegexp []*regexp.Regexp done chan struct{} @@ -82,6 +82,6 @@ func (h *Harvester) Start() { } func (h *Harvester) Stop() { - logp.Debug("harvester", "Stopping harvester: %v", h.Id) + //logp.Debug("harvester", "Stopping harvester: %v", h.Id) close(h.done) } diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index e660c362e56..f09b421431f 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -24,8 +24,8 @@ func (h *Harvester) Harvest() { // Make sure file is closed as soon as harvester exits // If file was never properly opened, it can't be closed - if h.file != nil { - h.file.Close() + if h.getFile() != nil { + h.closeFile() logp.Debug("harvester", "Stopping harvester, closing file: %s", h.Path) } else { logp.Debug("harvester", "Stopping harvester, NOT closing file as file info not available: %s", h.Path) @@ -38,7 +38,7 @@ func (h *Harvester) Harvest() { return } - h.fileInfo, err = h.file.Stat() + h.fileInfo, err = h.getFile().Stat() if err != nil { logp.Err("Stop Harvesting. Unexpected file stat rror: %s", err) return @@ -59,7 +59,7 @@ func (h *Harvester) Harvest() { } reader, err := createLineReader( - h.file, enc, config.BufferSize, config.MaxBytes, readerConfig, + h.getFile(), enc, config.BufferSize, config.MaxBytes, readerConfig, config.JSON, config.Multiline, h.done) if err != nil { @@ -71,7 +71,7 @@ func (h *Harvester) Harvest() { // Closes file so readLine returns error // TODO: What happens to this if h.done never closed? <-h.done - h.file.Close() + h.closeFile() }() // Report status harvester @@ -87,7 +87,7 @@ func (h *Harvester) Harvest() { ts, text, bytesRead, jsonFields, err := readLine(reader) if err != nil { if err == errFileTruncate { - seeker, ok := h.file.(io.Seeker) + seeker, ok := h.getFile().(io.Seeker) if !ok { logp.Err("can not seek source") return @@ -182,8 +182,8 @@ func (h *Harvester) open() (encoding.Encoding, error) { } func (h *Harvester) openStdin() (encoding.Encoding, error) { - h.file = pipeSource{os.Stdin} - return h.encoding(h.file) + h.setFile(pipeSource{os.Stdin}) + return h.encoding(h.getFile()) } // openFile opens a file and checks for the encoding. In case the encoding cannot be detected @@ -225,7 +225,7 @@ func (h *Harvester) openFile() (encoding.Encoding, error) { } // yay, open file - h.file = fileSource{file} + h.setFile(fileSource{file}) return encoding, nil } @@ -281,3 +281,24 @@ func (h *Harvester) GetOffset() int64 { return h.offset } + +func (h *Harvester) getFile() FileSource { + h.fileLock.Lock() + defer h.fileLock.Unlock() + + return h.file +} + +func (h *Harvester) setFile(file FileSource) { + h.fileLock.Lock() + defer h.fileLock.Unlock() + + h.file = file +} + +func (h *Harvester) closeFile() { + h.fileLock.Lock() + defer h.fileLock.Unlock() + + h.file.Close() +}