Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve prospector state handling #2840

Merged
merged 1 commit into from
Oct 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ https://github.com/elastic/beats/compare/v5.0.0...master[Check the HEAD diff]
*Topbeat*

*Filebeat*

- Add command line option -once to run filebeat only once and then close. {pull}2456[2456]
- Only load matching states into prospector to improve state handling {pull}2840[2840]
- Reset all states ttl on startup to make sure it is overwritten by new config {pull}2840[2840]

*Winlogbeat*

Expand Down
8 changes: 4 additions & 4 deletions filebeat/input/file/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewState(fileInfo os.FileInfo, path string) State {
Finished: false,
FileStateOS: GetOSState(fileInfo),
Timestamp: time.Now(),
TTL: -1 * time.Second, // By default, state does have an infinite ttl
TTL: -1, // By default, state does have an infinite ttl
}
}

Expand Down Expand Up @@ -102,11 +102,11 @@ func (s *States) Cleanup() int {

for _, state := range s.states {

ttl := state.TTL
expired := (state.TTL > 0 && currentTime.Sub(state.Timestamp) > state.TTL)

if ttl == 0 || (ttl > 0 && currentTime.Sub(state.Timestamp) > ttl) {
if state.TTL == 0 || expired {
if state.Finished {
logp.Debug("state", "State removed for %v because of older: %v", state.Source, ttl)
logp.Debug("state", "State removed for %v because of older: %v", state.Source, state.TTL)
continue // drop state
} else {
logp.Err("State for %s should have been dropped, but couldn't as state is not finished.", state.Source)
Expand Down
13 changes: 8 additions & 5 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Prospector struct {
}

type Prospectorer interface {
Init()
Init(states []file.State) error
Run()
}

Expand All @@ -49,8 +49,8 @@ func NewProspector(cfg *common.Config, states file.States, outlet Outlet) (*Pros
outlet: outlet,
harvesterChan: make(chan *input.Event),
done: make(chan struct{}),
states: states.Copy(),
wg: sync.WaitGroup{},
states: &file.States{},
channelWg: sync.WaitGroup{},
}

Expand All @@ -61,7 +61,7 @@ func NewProspector(cfg *common.Config, states file.States, outlet Outlet) (*Pros
return nil, err
}

err := prospector.Init()
err := prospector.Init(states.GetStates())
if err != nil {
return nil, err
}
Expand All @@ -72,7 +72,7 @@ func NewProspector(cfg *common.Config, states file.States, outlet Outlet) (*Pros
}

// Init sets up default config for prospector
func (p *Prospector) Init() error {
func (p *Prospector) Init(states []file.State) error {

var prospectorer Prospectorer
var err error
Expand All @@ -90,7 +90,10 @@ func (p *Prospector) Init() error {
return err
}

prospectorer.Init()
err = prospectorer.Init(states)
if err != nil {
return err
}
p.prospectorer = prospectorer

// Create empty harvester to check if configs are fine
Expand Down
64 changes: 27 additions & 37 deletions filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package prospector

import (
"expvar"
"os"
"path/filepath"
"runtime"
"strings"
"time"

"expvar"

"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"
Expand All @@ -21,7 +22,6 @@ var (
type ProspectorLog struct {
Prospector *Prospector
config prospectorConfig
lastClean time.Time
}

func NewProspectorLog(p *Prospector) (*ProspectorLog, error) {
Expand All @@ -34,37 +34,27 @@ func NewProspectorLog(p *Prospector) (*ProspectorLog, error) {
return prospectorer, nil
}

func (p *ProspectorLog) Init() {
// Init sets up the prospector
// It goes through all states coming from the registry. Only the states which match the glob patterns of
// the prospector will be loaded and updated. All other states will not be touched.
func (p *ProspectorLog) Init(states []file.State) error {
logp.Debug("prospector", "exclude_files: %s", p.config.ExcludeFiles)

logp.Info("Load previous states from registry into memory")
fileStates := p.Prospector.states.GetStates()

// Make sure all states are set as finished
for _, state := range fileStates {

for _, state := range states {
// Check if state source belongs to this prospector. If yes, update the state.
if p.hasFile(state.Source) {
// Set all states again to infinity TTL to make sure only removed if config still same
// clean_inactive / clean_removed could have been changed between restarts
if p.matchesFile(state.Source) {
state.TTL = -1

// Update prospector states and send new states to registry
err := p.Prospector.updateState(input.NewEvent(state))
if err != nil {
logp.Err("Problem putting initial state: %+v", err)
return err
}
} else {
// Only update internal state, do not report it to registry
// Having all states could be useful in case later a file is moved into this prospector
// TODO: Think about if this is expected or unexpected
p.Prospector.states.Update(state)
}
}

p.lastClean = time.Now()

logp.Info("Previous states loaded: %v", p.Prospector.states.Count())
logp.Info("Prospector with previous states loaded: %v", p.Prospector.states.Count())
return nil
}

func (p *ProspectorLog) Run() {
Expand Down Expand Up @@ -168,26 +158,26 @@ func (p *ProspectorLog) getFiles() map[string]os.FileInfo {
return paths
}

// hasFile returns true in case the given filePath is part of this prospector
func (p *ProspectorLog) hasFile(filePath string) bool {
// matchesFile returns true in case the given filePath is part of this prospector, means matches its glob patterns
func (p *ProspectorLog) matchesFile(filePath string) bool {
for _, glob := range p.config.Paths {
// Evaluate the path as a wildcards/shell glob
matches, err := filepath.Glob(glob)

if runtime.GOOS == "windows" {
// Windows allows / slashes which makes glob patterns with / work
// But for match we need paths with \ as only file names are compared and no lookup happens
glob = strings.Replace(glob, "/", "\\", -1)
}

// Evaluate if glob matches filePath
match, err := filepath.Match(glob, filePath)
if err != nil {
logp.Debug("prospector", "Error matching glob: %s", err)
continue
}

// Check any matched files to see if we need to start a harvester
for _, file := range matches {

// check if the file is in the exclude_files list
if p.isFileExcluded(file) {
continue
}

if filePath == file {
return true
}
// Check if file is not excluded
if match && !p.isFileExcluded(filePath) {
return true
}
}
return false
Expand Down
151 changes: 151 additions & 0 deletions filebeat/prospector/prospector_log_other_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// +build !windows

package prospector

import (
"regexp"
"testing"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"

"github.com/stretchr/testify/assert"
)

var matchTests = []struct {
file string
paths []string
excludeFiles []*regexp.Regexp
result bool
}{
{
"test/test.log",
[]string{"test/*"},
nil,
true,
},
{
"notest/test.log",
[]string{"test/*"},
nil,
false,
},
{
"test/test.log",
[]string{"test/*.log"},
nil,
true,
},
{
"test/test.log",
[]string{"test/*.nolog"},
nil,
false,
},
{
"test/test.log",
[]string{"test/*"},
[]*regexp.Regexp{regexp.MustCompile("test.log")},
false,
},
{
"test/test.log",
[]string{"test/*"},
[]*regexp.Regexp{regexp.MustCompile("test2.log")},
true,
},
}

func TestMatchFile(t *testing.T) {

for _, test := range matchTests {

p := ProspectorLog{
config: prospectorConfig{
Paths: test.paths,
ExcludeFiles: test.excludeFiles,
},
}

assert.Equal(t, test.result, p.matchesFile(test.file))
}
}

var initStateTests = []struct {
states []file.State // list of states
paths []string // prospector glob
count int // expected states in prospector
}{
{
[]file.State{
file.State{Source: "test"},
},
[]string{"test"},
1,
},
{
[]file.State{
file.State{Source: "notest"},
},
[]string{"test"},
0,
},
{
[]file.State{
file.State{Source: "test1.log", FileStateOS: file.StateOS{Inode: 1}},
file.State{Source: "test2.log", FileStateOS: file.StateOS{Inode: 2}},
},
[]string{"*.log"},
2,
},
{
[]file.State{
file.State{Source: "test1.log", FileStateOS: file.StateOS{Inode: 1}},
file.State{Source: "test2.log", FileStateOS: file.StateOS{Inode: 2}},
},
[]string{"test1.log"},
1,
},
{
[]file.State{
file.State{Source: "test1.log", FileStateOS: file.StateOS{Inode: 1}},
file.State{Source: "test2.log", FileStateOS: file.StateOS{Inode: 2}},
},
[]string{"test.log"},
0,
},
{
[]file.State{
file.State{Source: "test1.log", FileStateOS: file.StateOS{Inode: 1}},
file.State{Source: "test2.log", FileStateOS: file.StateOS{Inode: 1}},
},
[]string{"*.log"},
1, // Expecting only 1 state because of some inode (this is only a theoretical case)
},
}

// TestInit checks that the correct states are in a prospector after the init phase
// This means only the ones that match the glob and not exclude files
func TestInit(t *testing.T) {

for _, test := range initStateTests {
p := ProspectorLog{
Prospector: &Prospector{
states: &file.States{},
outlet: TestOutlet{},
},
config: prospectorConfig{
Paths: test.paths,
},
}
err := p.Init(test.states)
assert.NoError(t, err)
assert.Equal(t, test.count, p.Prospector.states.Count())
}

}

// TestOutlet is an empty outlet for testing
type TestOutlet struct{}

func (o TestOutlet) OnEvent(event *input.Event) bool { return true }
47 changes: 47 additions & 0 deletions filebeat/prospector/prospector_log_windows_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// +build windows

package prospector

import (
"regexp"
"testing"

"github.com/stretchr/testify/assert"
)

var matchTestsWindows = []struct {
file string
paths []string
excludeFiles []*regexp.Regexp
result bool
}{
{
"C:\\\\hello\\test\\test.log", // Path are always in windows format
[]string{"C:\\\\hello/test/*.log"}, // Globs can also be with forward slashes
nil,
true,
},
{
"C:\\\\hello\\test\\test.log", // Path are always in windows format
[]string{"C:\\\\hello\\test/*.log"}, // Globs can also be mixed
nil,
true,
},
}

// TestMatchFileWindows test if match works correctly on windows
// Separate test are needed on windows because of automated path conversion
func TestMatchFileWindows(t *testing.T) {

for _, test := range matchTestsWindows {

p := ProspectorLog{
config: prospectorConfig{
Paths: test.paths,
ExcludeFiles: test.excludeFiles,
},
}

assert.Equal(t, test.result, p.matchesFile(test.file))
}
}
Loading