Skip to content

Commit

Permalink
Enable Prospector to Update state without Harvester (#1978)
Browse files Browse the repository at this point in the history
Until now it was always necessary to create a harvester to update the state. This now allows the prospector to update a state for example on detection of a file that was renamed but no updates area available, so no harvester has to be started.

Changes:
* Rename FileState to State in harvester
* Remove lastScan time as not needed anymore.
* Cleanup state, enabled prospector to update state directly
* Make sendUpdateState private
* Make state private in harvester
* Remove mutex as not needed anymore
* Rename spoolerChan to prospectorChan as events are sent to prospector first
* Make path variable private
  • Loading branch information
ruflin authored and tsg committed Jul 8, 2016
1 parent 14c8970 commit f9754a6
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 80 deletions.
38 changes: 18 additions & 20 deletions filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package harvester
import (
"fmt"
"regexp"
"sync"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
Expand All @@ -27,12 +26,11 @@ import (
)

type Harvester struct {
Path string /* the file path to harvest */
Config harvesterConfig
path string /* the file path to harvest */
config harvesterConfig
offset int64
State file.State
stateMutex sync.Mutex
SpoolerChan chan *input.FileEvent
state file.State
prospectorChan chan *input.FileEvent
encoding encoding.EncodingFactory
file source.FileSource /* the file being watched */
ExcludeLinesRegexp []*regexp.Regexp
Expand All @@ -44,42 +42,42 @@ func NewHarvester(
cfg *common.Config,
path string,
state file.State,
spooler chan *input.FileEvent,
prospectorChan chan *input.FileEvent,
offset int64,
done chan struct{},
) (*Harvester, error) {

h := &Harvester{
Path: path,
Config: defaultConfig,
State: state,
SpoolerChan: spooler,
offset: offset,
done: done,
path: path,
config: defaultConfig,
state: state,
prospectorChan: prospectorChan,
offset: offset,
done: done,
}

if err := cfg.Unpack(&h.Config); err != nil {
if err := cfg.Unpack(&h.config); err != nil {
return nil, err
}
if err := h.Config.Validate(); err != nil {
if err := h.config.Validate(); err != nil {
return nil, err
}

encoding, ok := encoding.FindEncoding(h.Config.Encoding)
encoding, ok := encoding.FindEncoding(h.config.Encoding)
if !ok || encoding == nil {
return nil, fmt.Errorf("unknown encoding('%v')", h.Config.Encoding)
return nil, fmt.Errorf("unknown encoding('%v')", h.config.Encoding)
}
h.encoding = encoding

h.ExcludeLinesRegexp = h.Config.ExcludeLines
h.IncludeLinesRegexp = h.Config.IncludeLines
h.ExcludeLinesRegexp = h.config.ExcludeLines
h.IncludeLinesRegexp = h.config.IncludeLines
return h, nil
}

// open does open the file given under h.Path and assigns the file handler to h.file
func (h *Harvester) open() (encoding.Encoding, error) {

switch h.Config.InputType {
switch h.config.InputType {
case config.StdinInputType:
return h.openStdin()
case config.LogInputType:
Expand Down
4 changes: 2 additions & 2 deletions filebeat/harvester/harvester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
func TestExampleTest(t *testing.T) {

h := Harvester{
Path: "/var/log/",
path: "/var/log/",
offset: 0,
}

assert.Equal(t, "/var/log/", h.Path)
assert.Equal(t, "/var/log/", h.path)
}
83 changes: 41 additions & 42 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,20 @@ func (h *Harvester) Harvest() {
// Makes sure file is properly closed when the harvester is stopped
defer h.close()

h.State.Finished = false
h.state.Finished = false

enc, err := h.open()
if err != nil {
logp.Err("Stop Harvesting. Unexpected file opening error: %s", err)
return
}

logp.Info("Harvester started for file: %s", h.Path)
logp.Info("Harvester started for file: %s", h.path)

// TODO: NewLineReader uses additional buffering to deal with encoding and testing
// for new lines in input stream. Simple 8-bit based encodings, or plain
// don't require 'complicated' logic.
cfg := h.Config
cfg := h.config
readerConfig := reader.LogFileReaderConfig{
CloseRemoved: cfg.CloseRemoved,
CloseRenamed: cfg.CloseRenamed,
Expand All @@ -57,7 +57,7 @@ func (h *Harvester) Harvest() {
}

// Always report the state before starting a harvester
if !h.SendStateUpdate() {
if !h.sendStateUpdate() {
return
}

Expand All @@ -74,14 +74,14 @@ func (h *Harvester) Harvest() {
if err != nil {
switch err {
case reader.ErrFileTruncate:
logp.Info("File was truncated. Begin reading file from offset 0: %s", h.Path)
logp.Info("File was truncated. Begin reading file from offset 0: %s", h.path)
h.SetOffset(0)
case reader.ErrRemoved:
logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.Path)
logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.path)
case reader.ErrRenamed:
logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.Path)
logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.path)
case io.EOF:
logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.Path)
logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.path)
default:
logp.Info("Read line error: %s", err)
}
Expand All @@ -94,7 +94,6 @@ func (h *Harvester) Harvest() {
event := h.createEvent()

if h.shouldExportLine(text) {

event.ReadTime = ts
event.Bytes = bytesRead
event.Text = &text
Expand All @@ -113,19 +112,17 @@ func (h *Harvester) Harvest() {
// of a harvester
func (h *Harvester) createEvent() *input.FileEvent {
event := &input.FileEvent{
EventMetadata: h.Config.EventMetadata,
Source: h.Path,
InputType: h.Config.InputType,
DocumentType: h.Config.DocumentType,
EventMetadata: h.config.EventMetadata,
Source: h.path,
InputType: h.config.InputType,
DocumentType: h.config.DocumentType,
Offset: h.getOffset(),
Bytes: 0,
Fileinfo: h.State.Fileinfo,
JSONConfig: h.Config.JSON,
Fileinfo: h.state.Fileinfo,
JSONConfig: h.config.JSON,
State: h.getState(),
}

if h.Config.InputType != config.StdinInputType {
event.FileState = h.GetState()
}
return event
}

Expand All @@ -135,7 +132,7 @@ func (h *Harvester) sendEvent(event *input.FileEvent) bool {
select {
case <-h.done:
return false
case h.SpoolerChan <- event: // ship the new event downstream
case h.prospectorChan <- event: // ship the new event downstream
return true
}
}
Expand Down Expand Up @@ -169,9 +166,9 @@ func (h *Harvester) shouldExportLine(line string) bool {
func (h *Harvester) openFile() (encoding.Encoding, error) {
var encoding encoding.Encoding

f, err := file.ReadOpen(h.Path)
f, err := file.ReadOpen(h.path)
if err != nil {
logp.Err("Failed opening %s: %s", h.Path, err)
logp.Err("Failed opening %s: %s", h.path, err)
return nil, err
}

Expand All @@ -182,11 +179,11 @@ func (h *Harvester) openFile() (encoding.Encoding, error) {

info, err := f.Stat()
if err != nil {
logp.Err("Failed getting stats for file %s: %s", h.Path, err)
logp.Err("Failed getting stats for file %s: %s", h.path, err)
return nil, err
}
// Compares the stat of the opened file to the state given by the prospector. Abort if not match.
if !os.SameFile(h.State.Fileinfo, info) {
if !os.SameFile(h.state.Fileinfo, info) {
return nil, errors.New("File info is not identical with opened file. Aborting harvesting and retrying file later again.")
}

Expand Down Expand Up @@ -219,20 +216,20 @@ func (h *Harvester) initFileOffset(file *os.File) error {
// continue from last known offset

logp.Debug("harvester",
"harvest: %q position:%d (offset snapshot:%d)", h.Path, h.getOffset(), offset)
"harvest: %q position:%d (offset snapshot:%d)", h.path, h.getOffset(), offset)
_, err = file.Seek(h.getOffset(), os.SEEK_SET)
} else if h.Config.TailFiles {
} else if h.config.TailFiles {
// tail file if file is new and tail_files config is set

logp.Debug("harvester",
"harvest: (tailing) %q (offset snapshot:%d)", h.Path, offset)
"harvest: (tailing) %q (offset snapshot:%d)", h.path, offset)
offset, err = file.Seek(0, os.SEEK_END)
h.SetOffset(offset)

} else {
// get offset from file in case of encoding factory was
// required to read some data.
logp.Debug("harvester", "harvest: %q (offset snapshot:%d)", h.Path, offset)
logp.Debug("harvester", "harvest: %q (offset snapshot:%d)", h.path, offset)
h.SetOffset(offset)
}

Expand All @@ -251,43 +248,45 @@ func (h *Harvester) updateOffset(increment int64) {
h.offset += increment
}

// SendStateUpdate send an empty event with the current state to update the registry
func (h *Harvester) SendStateUpdate() bool {
logp.Debug("harvester", "Update state: %s, offset: %v", h.Path, h.offset)
// sendStateUpdate send an empty event with the current state to update the registry
func (h *Harvester) sendStateUpdate() bool {
logp.Debug("harvester", "Update state: %s, offset: %v", h.path, h.offset)
return h.sendEvent(h.createEvent())
}

func (h *Harvester) GetState() file.State {
h.stateMutex.Lock()
defer h.stateMutex.Unlock()
func (h *Harvester) getState() file.State {

if h.config.InputType == config.StdinInputType {
return file.State{}
}

h.refreshState()
return h.State
return h.state
}

// refreshState refreshes the values in State with the values from the harvester itself
func (h *Harvester) refreshState() {
h.State.Source = h.Path
h.State.Offset = h.getOffset()
h.State.FileStateOS = file.GetOSState(h.State.Fileinfo)
h.state.Source = h.path
h.state.Offset = h.getOffset()
h.state.FileStateOS = file.GetOSState(h.state.Fileinfo)
}

func (h *Harvester) close() {
// Mark harvester as finished
h.State.Finished = true
h.state.Finished = true

// On completion, push offset so we can continue where we left off if we relaunch on the same file
h.SendStateUpdate()
h.sendStateUpdate()

logp.Debug("harvester", "Stopping harvester for file: %s", h.Path)
logp.Debug("harvester", "Stopping harvester for file: %s", h.path)

// Make sure file is closed as soon as harvester exits
// If file was never opened, it can't be closed
if h.file != nil {
h.file.Close()
logp.Debug("harvester", "Stopping harvester, closing file: %s", h.Path)
logp.Debug("harvester", "Stopping harvester, closing file: %s", h.path)
} else {
logp.Warn("Stopping harvester, NOT closing file as file info not available: %s", h.Path)
logp.Warn("Stopping harvester, NOT closing file as file info not available: %s", h.path)
}
}

Expand Down
8 changes: 7 additions & 1 deletion filebeat/input/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ type FileEvent struct {
Fileinfo os.FileInfo
JSONFields common.MapStr
JSONConfig *processor.JSONConfig
FileState file.State
State file.State
}

func NewEvent(state file.State) *FileEvent {
return &FileEvent{
State: state,
}
}

func (f *FileEvent) ToMapStr() common.MapStr {
Expand Down
4 changes: 2 additions & 2 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ func (p *Prospector) Run() {
case event := <-p.harvesterChan:
// Add ttl if cleanOlder is enabled
if p.config.CleanOlder > 0 {
event.FileState.TTL = p.config.CleanOlder
event.State.TTL = p.config.CleanOlder
}
select {
case <-p.done:
logp.Info("Prospector channel stopped")
return
case p.spoolerChan <- event:
p.states.Update(event.FileState)
p.states.Update(event.State)
}
}
}
Expand Down
19 changes: 7 additions & 12 deletions filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
"time"

"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/logp"
)

type ProspectorLog struct {
Prospector *Prospector
config prospectorConfig
lastScan time.Time
lastClean time.Time
}

Expand Down Expand Up @@ -65,8 +65,8 @@ func (p *ProspectorLog) Run() {
_, err := os.Stat(state.Source)
if err != nil {
state.TTL = 0
h, _ := p.Prospector.createHarvester(state)
h.SendStateUpdate()
event := input.NewEvent(state)
p.Prospector.harvesterChan <- event
logp.Debug("prospector", "Cleanup state for file as file removed: %s", state.Source)
}
}
Expand Down Expand Up @@ -123,8 +123,6 @@ func (p *ProspectorLog) getFiles() map[string]os.FileInfo {
// Scan starts a scanGlob for each provided path/glob
func (p *ProspectorLog) scan() {

newLastScan := time.Now()

// TODO: Track harvesters to prevent any file from being harvested twice. Finished state could be delayed?
// Now let's do one quick scan to pick up new files
for f, fileinfo := range p.getFiles() {
Expand All @@ -144,9 +142,6 @@ func (p *ProspectorLog) scan() {
p.harvestExistingFile(newState, lastState)
}
}

// Only update lastScan timestamp after scan is completed
p.lastScan = newLastScan
}

// harvestNewFile harvest a new file
Expand Down Expand Up @@ -182,11 +177,11 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S
// or no new lines were detected. It sends only an event status update to make sure the new name is persisted.
logp.Debug("prospector", "File rename was detected, updating state: %s -> %s, Current offset: %v", oldState.Source, newState.Source, oldState.Offset)

h, _ := p.Prospector.createHarvester(newState)
h.SetOffset(oldState.Offset)

// Update state because of file rotation
h.SendStateUpdate()
newState.Offset = oldState.Offset
event := input.NewEvent(newState)
p.Prospector.harvesterChan <- event

} else {
// TODO: improve logging depedent on what the exact reason is that harvesting does not continue
// Nothing to do. Harvester is still running and file was not renamed
Expand Down
Loading

0 comments on commit f9754a6

Please sign in to comment.