Skip to content

Commit

Permalink
Improve prospector state handling (#2840)
Browse files Browse the repository at this point in the history
Previously each prospector was holding old states initially loaded by the registry file. This was changed to that each prospector only loads the states with a path that matches the glob pattern. This reduces the number of states handled by each prospector and reduces overlap.

One consequence of this is that in case a log file "moves" from one prospector to an other through renaming during the runtime of filebeat, no state will be found. But this behaviour is not expected and would lead to other issues already now. The expectation is that a file stays inside the prospector over its full lifetime.

In case of a filebeat restart including a config change, it can happen that some states are not managed anymore by a prospector. These "obsolete" states will stay in the registrar until a further config change when a glob pattern would again include these states. As an example:

Filebeat is started with the following config.

```
filebeat.prospectors:
- paths: ['*.log']
  clean_removed: true
```

There is a log file `a.log` and `b.log`. Both files are harvested and states for `a.log` and `b.log` are persisted. Then filebeat is stopped and the config file is modified to.

```
filebeat.prospectors:
- paths: ['a.log']
  clean_removed: true
```

Filebeat is started again. The prospector will now only handle the state for `a.log`. In case `b.log` is removed from disk, the state for `b.log` will stay in the registry file.

In case the config file was with `clean_inactive: 5min`, all TTL are reset on restart to `-2` by the registry and `-1` the prospector or the new `clean_inactive` value. Using `-2` in the registrar can be useful in the future to detect which states are not managed anymore by any prospector. As all TTL are reset on restart, persisting of the TTL is not required anymore but can become useful for cleanup so the information is kept in the registry file.

Further changes:

* Add tests for matchFile method
* Add tests for prospector state init filtering
* states are passed to `Prospector.Init` on startup instead of setting it directly in the object
* `Prospector.Init` and `Prospectorer.Init` now return an error and filebeat exits on startup problems
* Remove lastClean as not needed anymore
* Have one log file for each bom file test
* Update to new vagrant box with Golang 1.7.3
  • Loading branch information
ruflin authored and tsg committed Oct 31, 2016
1 parent e2509b7 commit 470d8b1
Show file tree
Hide file tree
Showing 12 changed files with 471 additions and 58 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ https://github.com/elastic/beats/compare/v5.0.0...master[Check the HEAD diff]
*Topbeat*

*Filebeat*

- Add command line option -once to run filebeat only once and then close. {pull}2456[2456]
- Only load matching states into prospector to improve state handling {pull}2840[2840]
- Reset all states ttl on startup to make sure it is overwritten by new config {pull}2840[2840]

*Winlogbeat*

Expand Down
8 changes: 4 additions & 4 deletions filebeat/input/file/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewState(fileInfo os.FileInfo, path string) State {
Finished: false,
FileStateOS: GetOSState(fileInfo),
Timestamp: time.Now(),
TTL: -1 * time.Second, // By default, state does have an infinite ttl
TTL: -1, // By default, state does have an infinite ttl
}
}

Expand Down Expand Up @@ -102,11 +102,11 @@ func (s *States) Cleanup() int {

for _, state := range s.states {

ttl := state.TTL
expired := (state.TTL > 0 && currentTime.Sub(state.Timestamp) > state.TTL)

if ttl == 0 || (ttl > 0 && currentTime.Sub(state.Timestamp) > ttl) {
if state.TTL == 0 || expired {
if state.Finished {
logp.Debug("state", "State removed for %v because of older: %v", state.Source, ttl)
logp.Debug("state", "State removed for %v because of older: %v", state.Source, state.TTL)
continue // drop state
} else {
logp.Err("State for %s should have been dropped, but couldn't as state is not finished.", state.Source)
Expand Down
13 changes: 8 additions & 5 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Prospector struct {
}

type Prospectorer interface {
Init()
Init(states []file.State) error
Run()
}

Expand All @@ -49,8 +49,8 @@ func NewProspector(cfg *common.Config, states file.States, outlet Outlet) (*Pros
outlet: outlet,
harvesterChan: make(chan *input.Event),
done: make(chan struct{}),
states: states.Copy(),
wg: sync.WaitGroup{},
states: &file.States{},
channelWg: sync.WaitGroup{},
}

Expand All @@ -61,7 +61,7 @@ func NewProspector(cfg *common.Config, states file.States, outlet Outlet) (*Pros
return nil, err
}

err := prospector.Init()
err := prospector.Init(states.GetStates())
if err != nil {
return nil, err
}
Expand All @@ -72,7 +72,7 @@ func NewProspector(cfg *common.Config, states file.States, outlet Outlet) (*Pros
}

// Init sets up default config for prospector
func (p *Prospector) Init() error {
func (p *Prospector) Init(states []file.State) error {

var prospectorer Prospectorer
var err error
Expand All @@ -90,7 +90,10 @@ func (p *Prospector) Init() error {
return err
}

prospectorer.Init()
err = prospectorer.Init(states)
if err != nil {
return err
}
p.prospectorer = prospectorer

// Create empty harvester to check if configs are fine
Expand Down
64 changes: 27 additions & 37 deletions filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package prospector

import (
"expvar"
"os"
"path/filepath"
"runtime"
"strings"
"time"

"expvar"

"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"
Expand All @@ -21,7 +22,6 @@ var (
type ProspectorLog struct {
Prospector *Prospector
config prospectorConfig
lastClean time.Time
}

func NewProspectorLog(p *Prospector) (*ProspectorLog, error) {
Expand All @@ -34,37 +34,27 @@ func NewProspectorLog(p *Prospector) (*ProspectorLog, error) {
return prospectorer, nil
}

func (p *ProspectorLog) Init() {
// Init sets up the prospector
// It goes through all states coming from the registry. Only the states which match the glob patterns of
// the prospector will be loaded and updated. All other states will not be touched.
func (p *ProspectorLog) Init(states []file.State) error {
logp.Debug("prospector", "exclude_files: %s", p.config.ExcludeFiles)

logp.Info("Load previous states from registry into memory")
fileStates := p.Prospector.states.GetStates()

// Make sure all states are set as finished
for _, state := range fileStates {

for _, state := range states {
// Check if state source belongs to this prospector. If yes, update the state.
if p.hasFile(state.Source) {
// Set all states again to infinity TTL to make sure only removed if config still same
// clean_inactive / clean_removed could have been changed between restarts
if p.matchesFile(state.Source) {
state.TTL = -1

// Update prospector states and send new states to registry
err := p.Prospector.updateState(input.NewEvent(state))
if err != nil {
logp.Err("Problem putting initial state: %+v", err)
return err
}
} else {
// Only update internal state, do not report it to registry
// Having all states could be useful in case later a file is moved into this prospector
// TODO: Think about if this is expected or unexpected
p.Prospector.states.Update(state)
}
}

p.lastClean = time.Now()

logp.Info("Previous states loaded: %v", p.Prospector.states.Count())
logp.Info("Prospector with previous states loaded: %v", p.Prospector.states.Count())
return nil
}

func (p *ProspectorLog) Run() {
Expand Down Expand Up @@ -168,26 +158,26 @@ func (p *ProspectorLog) getFiles() map[string]os.FileInfo {
return paths
}

// hasFile returns true in case the given filePath is part of this prospector
func (p *ProspectorLog) hasFile(filePath string) bool {
// matchesFile returns true in case the given filePath is part of this prospector, means matches its glob patterns
func (p *ProspectorLog) matchesFile(filePath string) bool {
for _, glob := range p.config.Paths {
// Evaluate the path as a wildcards/shell glob
matches, err := filepath.Glob(glob)

if runtime.GOOS == "windows" {
// Windows allows / slashes which makes glob patterns with / work
// But for match we need paths with \ as only file names are compared and no lookup happens
glob = strings.Replace(glob, "/", "\\", -1)
}

// Evaluate if glob matches filePath
match, err := filepath.Match(glob, filePath)
if err != nil {
logp.Debug("prospector", "Error matching glob: %s", err)
continue
}

// Check any matched files to see if we need to start a harvester
for _, file := range matches {

// check if the file is in the exclude_files list
if p.isFileExcluded(file) {
continue
}

if filePath == file {
return true
}
// Check if file is not excluded
if match && !p.isFileExcluded(filePath) {
return true
}
}
return false
Expand Down
151 changes: 151 additions & 0 deletions filebeat/prospector/prospector_log_other_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// +build !windows

package prospector

import (
"regexp"
"testing"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"

"github.com/stretchr/testify/assert"
)

var matchTests = []struct {
file string
paths []string
excludeFiles []*regexp.Regexp
result bool
}{
{
"test/test.log",
[]string{"test/*"},
nil,
true,
},
{
"notest/test.log",
[]string{"test/*"},
nil,
false,
},
{
"test/test.log",
[]string{"test/*.log"},
nil,
true,
},
{
"test/test.log",
[]string{"test/*.nolog"},
nil,
false,
},
{
"test/test.log",
[]string{"test/*"},
[]*regexp.Regexp{regexp.MustCompile("test.log")},
false,
},
{
"test/test.log",
[]string{"test/*"},
[]*regexp.Regexp{regexp.MustCompile("test2.log")},
true,
},
}

func TestMatchFile(t *testing.T) {

for _, test := range matchTests {

p := ProspectorLog{
config: prospectorConfig{
Paths: test.paths,
ExcludeFiles: test.excludeFiles,
},
}

assert.Equal(t, test.result, p.matchesFile(test.file))
}
}

var initStateTests = []struct {
states []file.State // list of states
paths []string // prospector glob
count int // expected states in prospector
}{
{
[]file.State{
file.State{Source: "test"},
},
[]string{"test"},
1,
},
{
[]file.State{
file.State{Source: "notest"},
},
[]string{"test"},
0,
},
{
[]file.State{
file.State{Source: "test1.log", FileStateOS: file.StateOS{Inode: 1}},
file.State{Source: "test2.log", FileStateOS: file.StateOS{Inode: 2}},
},
[]string{"*.log"},
2,
},
{
[]file.State{
file.State{Source: "test1.log", FileStateOS: file.StateOS{Inode: 1}},
file.State{Source: "test2.log", FileStateOS: file.StateOS{Inode: 2}},
},
[]string{"test1.log"},
1,
},
{
[]file.State{
file.State{Source: "test1.log", FileStateOS: file.StateOS{Inode: 1}},
file.State{Source: "test2.log", FileStateOS: file.StateOS{Inode: 2}},
},
[]string{"test.log"},
0,
},
{
[]file.State{
file.State{Source: "test1.log", FileStateOS: file.StateOS{Inode: 1}},
file.State{Source: "test2.log", FileStateOS: file.StateOS{Inode: 1}},
},
[]string{"*.log"},
1, // Expecting only 1 state because of some inode (this is only a theoretical case)
},
}

// TestInit checks that the correct states are in a prospector after the init phase
// This means only the ones that match the glob and not exclude files
func TestInit(t *testing.T) {

for _, test := range initStateTests {
p := ProspectorLog{
Prospector: &Prospector{
states: &file.States{},
outlet: TestOutlet{},
},
config: prospectorConfig{
Paths: test.paths,
},
}
err := p.Init(test.states)
assert.NoError(t, err)
assert.Equal(t, test.count, p.Prospector.states.Count())
}

}

// TestOutlet is an empty outlet for testing
type TestOutlet struct{}

func (o TestOutlet) OnEvent(event *input.Event) bool { return true }
47 changes: 47 additions & 0 deletions filebeat/prospector/prospector_log_windows_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// +build windows

package prospector

import (
"regexp"
"testing"

"github.com/stretchr/testify/assert"
)

var matchTestsWindows = []struct {
file string
paths []string
excludeFiles []*regexp.Regexp
result bool
}{
{
"C:\\\\hello\\test\\test.log", // Path are always in windows format
[]string{"C:\\\\hello/test/*.log"}, // Globs can also be with forward slashes
nil,
true,
},
{
"C:\\\\hello\\test\\test.log", // Path are always in windows format
[]string{"C:\\\\hello\\test/*.log"}, // Globs can also be mixed
nil,
true,
},
}

// TestMatchFileWindows test if match works correctly on windows
// Separate test are needed on windows because of automated path conversion
func TestMatchFileWindows(t *testing.T) {

for _, test := range matchTestsWindows {

p := ProspectorLog{
config: prospectorConfig{
Paths: test.paths,
ExcludeFiles: test.excludeFiles,
},
}

assert.Equal(t, test.result, p.matchesFile(test.file))
}
}
Loading

0 comments on commit 470d8b1

Please sign in to comment.