Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Filebeat organisiation and Cleanup #1894

Merged
merged 1 commit into from
Jun 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 44 additions & 46 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,15 @@ import (
cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/crawler"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/publish"
"github.com/elastic/beats/filebeat/registrar"
"github.com/elastic/beats/filebeat/spooler"
)

// Filebeat is a beater object. Contains all objects needed to run the beat
type Filebeat struct {
FbConfig *cfg.Config
// Channel from harvesters to spooler
publisherChan chan []*input.FileEvent
spooler *Spooler
registrar *crawler.Registrar
crawler *crawler.Crawler
pub logPublisher
done chan struct{}
config *cfg.Config
done chan struct{}
}

// New creates a new Filebeat pointer instance.
Expand All @@ -32,73 +29,86 @@ func New() *Filebeat {
func (fb *Filebeat) Config(b *beat.Beat) error {

// Load Base config
err := b.RawConfig.Unpack(&fb.FbConfig)

err := b.RawConfig.Unpack(&fb.config)
if err != nil {
return fmt.Errorf("Error reading config file: %v", err)
}

// Check if optional config_dir is set to fetch additional prospector config files
fb.FbConfig.FetchConfigs()
fb.config.FetchConfigs()

return nil
}

// Setup applies the minimum required setup to a new Filebeat instance for use.
func (fb *Filebeat) Setup(b *beat.Beat) error {
fb.done = make(chan struct{})

return nil
}

// Run allows the beater to be run as a beat.
func (fb *Filebeat) Run(b *beat.Beat) error {

var err error

// Init channels
fb.publisherChan = make(chan []*input.FileEvent, 1)
config := fb.config.Filebeat

// Setup registrar to persist state
fb.registrar, err = crawler.NewRegistrar(fb.FbConfig.Filebeat.RegistryFile)
registrar, err := registrar.New(config.RegistryFile)
if err != nil {
logp.Err("Could not init registrar: %v", err)
return err
}

fb.crawler = &crawler.Crawler{
Registrar: fb.registrar,
// Channel from harvesters to spooler
publisherChan := make(chan []*input.FileEvent, 1)

// Publishes event to output
publisher := publish.New(config.PublishAsync,
publisherChan, registrar.Channel, b.Publisher.Connect())

// Init and Start spooler: Harvesters dump events into the spooler.
spooler, err := spooler.New(config, publisherChan)
if err != nil {
logp.Err("Could not init spooler: %v", err)
return err
}

// Load the previous log file locations now, for use in prospector
err = fb.registrar.LoadState()
crawler, err := crawler.New(spooler, config.Prospectors)
if err != nil {
logp.Err("Error loading state: %v", err)
logp.Err("Could not init crawler: %v", err)
return err
}

// Init and Start spooler: Harvesters dump events into the spooler.
fb.spooler = NewSpooler(fb.FbConfig.Filebeat, fb.publisherChan)
// The order of starting and stopping is important. Stopping is inverted to the starting order.
// The current order is: registrar, publisher, spooler, crawler
// That means, crawler is stopped first.

// Start the registrar
err = registrar.Start()
if err != nil {
logp.Err("Could not init spooler: %v", err)
return err
logp.Err("Could not start registrar: %v", err)
}
// Stopping registrar will write last state
defer registrar.Stop()

fb.registrar.Start()
fb.spooler.Start()
// Start publisher
publisher.Start()
// Stopping publisher (might potentially drop items)
defer publisher.Stop()

// Starting spooler
spooler.Start()
// Stopping spooler will flush items
defer spooler.Stop()

err = fb.crawler.Start(fb.FbConfig.Filebeat.Prospectors, fb.spooler.Channel)
err = crawler.Start(registrar.GetStates())
if err != nil {
return err
}
// Stop crawler -> stop prospectors -> stop harvesters
defer crawler.Stop()

// Publishes event to output
fb.pub = newPublisher(fb.FbConfig.Filebeat.PublishAsync,
fb.publisherChan, fb.registrar.Channel, b.Publisher.Connect())
fb.pub.Start()

// Blocks progressing
// Blocks progressing. As soon as channel is closed, all defer statements come into play
<-fb.done

return nil
Expand All @@ -114,18 +124,6 @@ func (fb *Filebeat) Stop() {

logp.Info("Stopping filebeat")

// Stop crawler -> stop prospectors -> stop harvesters
fb.crawler.Stop()

// Stopping spooler will flush items
fb.spooler.Stop()

// stopping publisher (might potentially drop items)
fb.pub.Stop()

// Stopping registrar will write last state
fb.registrar.Stop()

// Stop Filebeat
close(fb.done)
}
3 changes: 0 additions & 3 deletions filebeat/beater/filebeat_test.go

This file was deleted.

38 changes: 24 additions & 14 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/prospector"
"github.com/elastic/beats/filebeat/spooler"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)
Expand All @@ -23,27 +24,32 @@ import (
*/

type Crawler struct {
// Registrar object to persist the state
Registrar *Registrar
prospectors []*prospector.Prospector
wg sync.WaitGroup
prospectors []*prospector.Prospector
wg sync.WaitGroup
spooler *spooler.Spooler
prospectorConfigs []*common.Config
}

func (c *Crawler) Start(prospectorConfigs []*common.Config, eventChan chan *input.FileEvent) error {
func New(spooler *spooler.Spooler, prospectorConfigs []*common.Config) (*Crawler, error) {

if len(prospectorConfigs) == 0 {
return fmt.Errorf("No prospectors defined. You must have at least one prospector defined in the config file.")
return nil, fmt.Errorf("No prospectors defined. You must have at least one prospector defined in the config file.")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we change the go routine to:

go func(p Prospector){
    defer c.wg.Done()
    p.Stop
}(prospector)

The prospector shouldn't need to know someone is waiting for stop having finished. Simple form of abstraction leak.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


logp.Info("Loading Prospectors: %v", len(prospectorConfigs))
return &Crawler{
spooler: spooler,
prospectorConfigs: prospectorConfigs,
}, nil
}

func (c *Crawler) Start(states input.States) error {

// Get existing states
states := *c.Registrar.state
logp.Info("Loading Prospectors: %v", len(c.prospectorConfigs))

// Prospect the globs/paths given on the command line and launch harvesters
for _, prospectorConfig := range prospectorConfigs {
for _, prospectorConfig := range c.prospectorConfigs {

prospector, err := prospector.NewProspector(prospectorConfig, states, eventChan)
prospector, err := prospector.NewProspector(prospectorConfig, states, c.spooler.Channel)
if err != nil {
return fmt.Errorf("Error in initing prospector: %s", err)
}
Expand All @@ -66,19 +72,23 @@ func (c *Crawler) Start(prospectorConfigs []*common.Config, eventChan chan *inpu
}(i, p)
}

logp.Info("All prospectors are initialised and running with %d states to persist", c.Registrar.state.Count())
logp.Info("All prospectors are initialised and running with %d states to persist", states.Count())

return nil
}

func (c *Crawler) Stop() {
logp.Info("Stopping Crawler")
stopProspector := func(p *prospector.Prospector) {
defer c.wg.Done()
p.Stop()
}

logp.Info("Stopping %v prospectors", len(c.prospectors))
for _, prospector := range c.prospectors {
for _, p := range c.prospectors {
// Stop prospectors in parallel
c.wg.Add(1)
go prospector.Stop(&c.wg)
go stopProspector(p)
}
c.wg.Wait()
logp.Info("Crawler stopped")
Expand Down
7 changes: 2 additions & 5 deletions filebeat/crawler/crawler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,14 @@ package crawler
import (
"testing"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/common"
"github.com/stretchr/testify/assert"
)

func TestCrawlerStartError(t *testing.T) {
crawler := Crawler{}
channel := make(chan *input.FileEvent, 1)
func TestNewCrawlerNoProspectorsError(t *testing.T) {
prospectorConfigs := []*common.Config{}

error := crawler.Start(prospectorConfigs, channel)
_, error := New(nil, prospectorConfigs)

assert.Error(t, error)
}
2 changes: 1 addition & 1 deletion filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (h *Harvester) Harvest() {
ts, text, bytesRead, jsonFields, err := readLine(reader)
if err != nil {
if err == errFileTruncate {
logp.Info("File was truncated. Begin reading file from offset 0: %s", h.Path)
logp.Warn("File was truncated. Begin reading file from offset 0: %s", h.Path)
h.SetOffset(0)
return
}
Expand Down
20 changes: 1 addition & 19 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,10 @@ func (p *Prospector) Run() {
}
}

func (p *Prospector) Stop(wg *sync.WaitGroup) {
func (p *Prospector) Stop() {
logp.Info("Stopping Prospector")
close(p.done)
p.wg.Wait()
wg.Done()
}

// createHarvester creates a new harvester instance from the given state
Expand Down Expand Up @@ -169,20 +168,3 @@ func (p *Prospector) startHarvester(state input.FileState, offset int64) (*harve

return h, nil
}

// isIgnoreOlder checks if the given state reached ignore_older
func (p *Prospector) isIgnoreOlder(state input.FileState) bool {

// ignore_older is disable
if p.config.IgnoreOlder == 0 {
return false
}

modTime := state.Fileinfo.ModTime()

if time.Since(modTime) > p.config.IgnoreOlder {
return true
}

return false
}
19 changes: 18 additions & 1 deletion filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (p *ProspectorLog) scan() {
// harvestNewFile harvest a new file
func (p *ProspectorLog) harvestNewFile(state input.FileState) {

if !p.Prospector.isIgnoreOlder(state) {
if !p.isIgnoreOlder(state) {
logp.Debug("prospector", "Start harvester for new file: %s", state.Source)
p.Prospector.startHarvester(state, 0)
} else {
Expand Down Expand Up @@ -176,3 +176,20 @@ func (p *ProspectorLog) isFileExcluded(file string) bool {
patterns := p.config.ExcludeFiles
return len(patterns) > 0 && harvester.MatchAnyRegexps(patterns, file)
}

// isIgnoreOlder checks if the given state reached ignore_older
func (p *ProspectorLog) isIgnoreOlder(state input.FileState) bool {

// ignore_older is disable
if p.config.IgnoreOlder == 0 {
return false
}

modTime := state.Fileinfo.ModTime()

if time.Since(modTime) > p.config.IgnoreOlder {
return true
}

return false
}
11 changes: 5 additions & 6 deletions filebeat/prospector/prospector_stdin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@ import (
)

type ProspectorStdin struct {
Prospector *Prospector
harvester *harvester.Harvester
started bool
harvester *harvester.Harvester
started bool
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need 'started'? How often do we expect someone to call Run in ProspectorStdin?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember I introduced it because there was an issue in the past that it was started twice. Will have to check in more detail if it is still needed. Lots of changes have happened ;-) Will put in my list for future clean ups.

}

// NewProspectorStdin creates a new stdin prospector
// This prospector contains one harvester which is reading from stdin
func NewProspectorStdin(p *Prospector) (*ProspectorStdin, error) {

prospectorer := &ProspectorStdin{
Prospector: p,
}
prospectorer := &ProspectorStdin{}

var err error

Expand Down
8 changes: 4 additions & 4 deletions filebeat/beater/publish.go → filebeat/publish/publish.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package beater
package publish

import (
"sync"
Expand All @@ -11,7 +11,7 @@ import (
"github.com/elastic/beats/libbeat/publisher"
)

type logPublisher interface {
type LogPublisher interface {
Start()
Stop()
}
Expand Down Expand Up @@ -61,11 +61,11 @@ const (
batchCanceled
)

func newPublisher(
func New(
async bool,
in, out chan []*input.FileEvent,
client publisher.Client,
) logPublisher {
) LogPublisher {
if async {
return newAsyncLogPublisher(in, out, client)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// +build !integration

package beater
package publish

import (
"fmt"
Expand Down Expand Up @@ -47,7 +47,7 @@ func TestPublisherModes(t *testing.T) {
regChan := make(chan []*input.FileEvent, len(test.order)+1)
client := pubtest.NewChanClient(0)

pub := newPublisher(test.async, pubChan, regChan, client)
pub := New(test.async, pubChan, regChan, client)
pub.Start()

var events [][]*input.FileEvent
Expand Down
Loading