Skip to content

Commit

Permalink
Try getting race conditions under control
Browse files Browse the repository at this point in the history
  • Loading branch information
ruflin committed Apr 25, 2016
1 parent f15f3f0 commit fe8b9dc
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 13 deletions.
4 changes: 2 additions & 2 deletions filebeat/crawler/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@ func (p *Prospector) Stop() {
logp.Info("Stopping Prospector")
close(p.done)

logp.Debug("prospector", "Stopping %d harvesters.", len(p.harvesters))
//logp.Debug("prospector", "Stopping %d harvesters.", len(p.harvesters))
for _, h := range p.harvesters {
go h.Stop()
}
logp.Debug("prospector", "Waiting for %d harvesters to stop", len(p.harvesters))
//logp.Debug("prospector", "Waiting for %d harvesters to stop", len(p.harvesters))
p.harvestersWaitGroup.Wait()

}
Expand Down
4 changes: 2 additions & 2 deletions filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/logp"
)

type Harvester struct {
Expand All @@ -38,6 +37,7 @@ type Harvester struct {
SpoolerChan chan *input.FileEvent
encoding encoding.EncodingFactory
file FileSource /* the file being watched */
fileLock sync.Mutex
ExcludeLinesRegexp []*regexp.Regexp
IncludeLinesRegexp []*regexp.Regexp
done chan struct{}
Expand Down Expand Up @@ -82,6 +82,6 @@ func (h *Harvester) Start() {
}

func (h *Harvester) Stop() {
logp.Debug("harvester", "Stopping harvester: %v", h.Id)
//logp.Debug("harvester", "Stopping harvester: %v", h.Id)
close(h.done)
}
39 changes: 30 additions & 9 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func (h *Harvester) Harvest() {

// Make sure file is closed as soon as harvester exits
// If file was never properly opened, it can't be closed
if h.file != nil {
h.file.Close()
if h.getFile() != nil {
h.closeFile()
logp.Debug("harvester", "Stopping harvester, closing file: %s", h.Path)
} else {
logp.Debug("harvester", "Stopping harvester, NOT closing file as file info not available: %s", h.Path)
Expand All @@ -38,7 +38,7 @@ func (h *Harvester) Harvest() {
return
}

h.fileInfo, err = h.file.Stat()
h.fileInfo, err = h.getFile().Stat()
if err != nil {
logp.Err("Stop Harvesting. Unexpected file stat rror: %s", err)
return
Expand All @@ -59,7 +59,7 @@ func (h *Harvester) Harvest() {
}

reader, err := createLineReader(
h.file, enc, config.BufferSize, config.MaxBytes, readerConfig,
h.getFile(), enc, config.BufferSize, config.MaxBytes, readerConfig,
config.JSON, config.Multiline, h.done)

if err != nil {
Expand All @@ -71,7 +71,7 @@ func (h *Harvester) Harvest() {
// Closes file so readLine returns error
// TODO: What happens to this if h.done never closed?
<-h.done
h.file.Close()
h.closeFile()
}()

// Report status harvester
Expand All @@ -87,7 +87,7 @@ func (h *Harvester) Harvest() {
ts, text, bytesRead, jsonFields, err := readLine(reader)
if err != nil {
if err == errFileTruncate {
seeker, ok := h.file.(io.Seeker)
seeker, ok := h.getFile().(io.Seeker)
if !ok {
logp.Err("can not seek source")
return
Expand Down Expand Up @@ -182,8 +182,8 @@ func (h *Harvester) open() (encoding.Encoding, error) {
}

func (h *Harvester) openStdin() (encoding.Encoding, error) {
h.file = pipeSource{os.Stdin}
return h.encoding(h.file)
h.setFile(pipeSource{os.Stdin})
return h.encoding(h.getFile())
}

// openFile opens a file and checks for the encoding. In case the encoding cannot be detected
Expand Down Expand Up @@ -225,7 +225,7 @@ func (h *Harvester) openFile() (encoding.Encoding, error) {
}

// yay, open file
h.file = fileSource{file}
h.setFile(fileSource{file})
return encoding, nil
}

Expand Down Expand Up @@ -281,3 +281,24 @@ func (h *Harvester) GetOffset() int64 {

return h.offset
}

func (h *Harvester) getFile() FileSource {
h.fileLock.Lock()
defer h.fileLock.Unlock()

return h.file
}

func (h *Harvester) setFile(file FileSource) {
h.fileLock.Lock()
defer h.fileLock.Unlock()

h.file = file
}

func (h *Harvester) closeFile() {
h.fileLock.Lock()
defer h.fileLock.Unlock()

h.file.Close()
}

0 comments on commit fe8b9dc

Please sign in to comment.