From 6079993f6138d2b9818389e1f1cefe59e436ca90 Mon Sep 17 00:00:00 2001 From: ruflin Date: Tue, 21 Jun 2016 16:26:49 +0200 Subject: [PATCH] Improve Filebeat organisiation and Cleanup The overall goal is to decouple all the modules to make them better reusable and extendable. In addition the current organisation was not very intuitive. * Move registrar to its own package * Improve startup handling of filebeat * Rename state to states in registrar as this is more accurate * Add warn message for truncated files. See https://github.com/elastic/beats/pull/1882/files#r67523587 * Move ignore older to ProspectorLog as only used there * Remove Prospector reference from ProspectorStdin * Remove empty filebeat test file * Cleanup New function naming * Implement defer statements to stop running services * Clean up crawler / prospector stopping --- filebeat/beater/filebeat.go | 90 ++++++++++---------- filebeat/beater/filebeat_test.go | 3 - filebeat/crawler/crawler.go | 38 ++++++--- filebeat/crawler/crawler_test.go | 7 +- filebeat/harvester/log.go | 2 +- filebeat/prospector/prospector.go | 20 +---- filebeat/prospector/prospector_log.go | 19 ++++- filebeat/prospector/prospector_stdin.go | 11 ++- filebeat/{beater => publish}/publish.go | 8 +- filebeat/{beater => publish}/publish_test.go | 4 +- filebeat/{crawler => registrar}/registrar.go | 37 +++++--- filebeat/{beater => spooler}/spooler.go | 10 +-- filebeat/{beater => spooler}/spooler_test.go | 11 ++- 13 files changed, 139 insertions(+), 121 deletions(-) delete mode 100644 filebeat/beater/filebeat_test.go rename filebeat/{beater => publish}/publish.go (98%) rename filebeat/{beater => publish}/publish_test.go (95%) rename filebeat/{crawler => registrar}/registrar.go (86%) rename filebeat/{beater => spooler}/spooler.go (97%) rename filebeat/{beater => spooler}/spooler_test.go (84%) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 47f1beca0a8..a296275b85c 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -9,18 +9,15 @@ import ( cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/crawler" "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/publish" + "github.com/elastic/beats/filebeat/registrar" + "github.com/elastic/beats/filebeat/spooler" ) // Filebeat is a beater object. Contains all objects needed to run the beat type Filebeat struct { - FbConfig *cfg.Config - // Channel from harvesters to spooler - publisherChan chan []*input.FileEvent - spooler *Spooler - registrar *crawler.Registrar - crawler *crawler.Crawler - pub logPublisher - done chan struct{} + config *cfg.Config + done chan struct{} } // New creates a new Filebeat pointer instance. @@ -32,14 +29,13 @@ func New() *Filebeat { func (fb *Filebeat) Config(b *beat.Beat) error { // Load Base config - err := b.RawConfig.Unpack(&fb.FbConfig) - + err := b.RawConfig.Unpack(&fb.config) if err != nil { return fmt.Errorf("Error reading config file: %v", err) } // Check if optional config_dir is set to fetch additional prospector config files - fb.FbConfig.FetchConfigs() + fb.config.FetchConfigs() return nil } @@ -47,7 +43,6 @@ func (fb *Filebeat) Config(b *beat.Beat) error { // Setup applies the minimum required setup to a new Filebeat instance for use. func (fb *Filebeat) Setup(b *beat.Beat) error { fb.done = make(chan struct{}) - return nil } @@ -55,50 +50,65 @@ func (fb *Filebeat) Setup(b *beat.Beat) error { func (fb *Filebeat) Run(b *beat.Beat) error { var err error - - // Init channels - fb.publisherChan = make(chan []*input.FileEvent, 1) + config := fb.config.Filebeat // Setup registrar to persist state - fb.registrar, err = crawler.NewRegistrar(fb.FbConfig.Filebeat.RegistryFile) + registrar, err := registrar.New(config.RegistryFile) if err != nil { logp.Err("Could not init registrar: %v", err) return err } - fb.crawler = &crawler.Crawler{ - Registrar: fb.registrar, + // Channel from harvesters to spooler + publisherChan := make(chan []*input.FileEvent, 1) + + // Publishes event to output + publisher := publish.New(config.PublishAsync, + publisherChan, registrar.Channel, b.Publisher.Connect()) + + // Init and Start spooler: Harvesters dump events into the spooler. + spooler, err := spooler.New(config, publisherChan) + if err != nil { + logp.Err("Could not init spooler: %v", err) + return err } - // Load the previous log file locations now, for use in prospector - err = fb.registrar.LoadState() + crawler, err := crawler.New(spooler, config.Prospectors) if err != nil { - logp.Err("Error loading state: %v", err) + logp.Err("Could not init crawler: %v", err) return err } - // Init and Start spooler: Harvesters dump events into the spooler. - fb.spooler = NewSpooler(fb.FbConfig.Filebeat, fb.publisherChan) + // The order of starting and stopping is important. Stopping is inverted to the starting order. + // The current order is: registrar, publisher, spooler, crawler + // That means, crawler is stopped first. + // Start the registrar + err = registrar.Start() if err != nil { - logp.Err("Could not init spooler: %v", err) - return err + logp.Err("Could not start registrar: %v", err) } + // Stopping registrar will write last state + defer registrar.Stop() - fb.registrar.Start() - fb.spooler.Start() + // Start publisher + publisher.Start() + // Stopping publisher (might potentially drop items) + defer publisher.Stop() + + // Starting spooler + spooler.Start() + // Stopping spooler will flush items + defer spooler.Stop() - err = fb.crawler.Start(fb.FbConfig.Filebeat.Prospectors, fb.spooler.Channel) + err = crawler.Start(registrar.GetStates()) if err != nil { return err } + // Stop crawler -> stop prospectors -> stop harvesters + defer crawler.Stop() - // Publishes event to output - fb.pub = newPublisher(fb.FbConfig.Filebeat.PublishAsync, - fb.publisherChan, fb.registrar.Channel, b.Publisher.Connect()) - fb.pub.Start() - - // Blocks progressing + // Blocks progressing. As soon as channel is closed, all defer statements come into play <-fb.done return nil @@ -114,18 +124,6 @@ func (fb *Filebeat) Stop() { logp.Info("Stopping filebeat") - // Stop crawler -> stop prospectors -> stop harvesters - fb.crawler.Stop() - - // Stopping spooler will flush items - fb.spooler.Stop() - - // stopping publisher (might potentially drop items) - fb.pub.Stop() - - // Stopping registrar will write last state - fb.registrar.Stop() - // Stop Filebeat close(fb.done) } diff --git a/filebeat/beater/filebeat_test.go b/filebeat/beater/filebeat_test.go deleted file mode 100644 index 222b97e0b07..00000000000 --- a/filebeat/beater/filebeat_test.go +++ /dev/null @@ -1,3 +0,0 @@ -// +build !integration - -package beater diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index fac631445b8..36c8602c689 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -6,6 +6,7 @@ import ( "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/prospector" + "github.com/elastic/beats/filebeat/spooler" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" ) @@ -23,27 +24,32 @@ import ( */ type Crawler struct { - // Registrar object to persist the state - Registrar *Registrar - prospectors []*prospector.Prospector - wg sync.WaitGroup + prospectors []*prospector.Prospector + wg sync.WaitGroup + spooler *spooler.Spooler + prospectorConfigs []*common.Config } -func (c *Crawler) Start(prospectorConfigs []*common.Config, eventChan chan *input.FileEvent) error { +func New(spooler *spooler.Spooler, prospectorConfigs []*common.Config) (*Crawler, error) { if len(prospectorConfigs) == 0 { - return fmt.Errorf("No prospectors defined. You must have at least one prospector defined in the config file.") + return nil, fmt.Errorf("No prospectors defined. You must have at least one prospector defined in the config file.") } - logp.Info("Loading Prospectors: %v", len(prospectorConfigs)) + return &Crawler{ + spooler: spooler, + prospectorConfigs: prospectorConfigs, + }, nil +} + +func (c *Crawler) Start(states input.States) error { - // Get existing states - states := *c.Registrar.state + logp.Info("Loading Prospectors: %v", len(c.prospectorConfigs)) // Prospect the globs/paths given on the command line and launch harvesters - for _, prospectorConfig := range prospectorConfigs { + for _, prospectorConfig := range c.prospectorConfigs { - prospector, err := prospector.NewProspector(prospectorConfig, states, eventChan) + prospector, err := prospector.NewProspector(prospectorConfig, states, c.spooler.Channel) if err != nil { return fmt.Errorf("Error in initing prospector: %s", err) } @@ -66,19 +72,23 @@ func (c *Crawler) Start(prospectorConfigs []*common.Config, eventChan chan *inpu }(i, p) } - logp.Info("All prospectors are initialised and running with %d states to persist", c.Registrar.state.Count()) + logp.Info("All prospectors are initialised and running with %d states to persist", states.Count()) return nil } func (c *Crawler) Stop() { logp.Info("Stopping Crawler") + stopProspector := func(p *prospector.Prospector) { + defer c.wg.Done() + p.Stop() + } logp.Info("Stopping %v prospectors", len(c.prospectors)) - for _, prospector := range c.prospectors { + for _, p := range c.prospectors { // Stop prospectors in parallel c.wg.Add(1) - go prospector.Stop(&c.wg) + go stopProspector(p) } c.wg.Wait() logp.Info("Crawler stopped") diff --git a/filebeat/crawler/crawler_test.go b/filebeat/crawler/crawler_test.go index e12dd87999f..fcbb2698a3e 100644 --- a/filebeat/crawler/crawler_test.go +++ b/filebeat/crawler/crawler_test.go @@ -5,17 +5,14 @@ package crawler import ( "testing" - "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/libbeat/common" "github.com/stretchr/testify/assert" ) -func TestCrawlerStartError(t *testing.T) { - crawler := Crawler{} - channel := make(chan *input.FileEvent, 1) +func TestNewCrawlerNoProspectorsError(t *testing.T) { prospectorConfigs := []*common.Config{} - error := crawler.Start(prospectorConfigs, channel) + _, error := New(nil, prospectorConfigs) assert.Error(t, error) } diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index 38d7b460d8c..905ac78f6af 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -63,7 +63,7 @@ func (h *Harvester) Harvest() { ts, text, bytesRead, jsonFields, err := readLine(reader) if err != nil { if err == errFileTruncate { - logp.Info("File was truncated. Begin reading file from offset 0: %s", h.Path) + logp.Warn("File was truncated. Begin reading file from offset 0: %s", h.Path) h.SetOffset(0) return } diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index 45c456ab18a..e586e6b8a14 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -130,11 +130,10 @@ func (p *Prospector) Run() { } } -func (p *Prospector) Stop(wg *sync.WaitGroup) { +func (p *Prospector) Stop() { logp.Info("Stopping Prospector") close(p.done) p.wg.Wait() - wg.Done() } // createHarvester creates a new harvester instance from the given state @@ -169,20 +168,3 @@ func (p *Prospector) startHarvester(state input.FileState, offset int64) (*harve return h, nil } - -// isIgnoreOlder checks if the given state reached ignore_older -func (p *Prospector) isIgnoreOlder(state input.FileState) bool { - - // ignore_older is disable - if p.config.IgnoreOlder == 0 { - return false - } - - modTime := state.Fileinfo.ModTime() - - if time.Since(modTime) > p.config.IgnoreOlder { - return true - } - - return false -} diff --git a/filebeat/prospector/prospector_log.go b/filebeat/prospector/prospector_log.go index 9fb458bada4..c9cce65ade7 100644 --- a/filebeat/prospector/prospector_log.go +++ b/filebeat/prospector/prospector_log.go @@ -132,7 +132,7 @@ func (p *ProspectorLog) scan() { // harvestNewFile harvest a new file func (p *ProspectorLog) harvestNewFile(state input.FileState) { - if !p.Prospector.isIgnoreOlder(state) { + if !p.isIgnoreOlder(state) { logp.Debug("prospector", "Start harvester for new file: %s", state.Source) p.Prospector.startHarvester(state, 0) } else { @@ -176,3 +176,20 @@ func (p *ProspectorLog) isFileExcluded(file string) bool { patterns := p.config.ExcludeFiles return len(patterns) > 0 && harvester.MatchAnyRegexps(patterns, file) } + +// isIgnoreOlder checks if the given state reached ignore_older +func (p *ProspectorLog) isIgnoreOlder(state input.FileState) bool { + + // ignore_older is disable + if p.config.IgnoreOlder == 0 { + return false + } + + modTime := state.Fileinfo.ModTime() + + if time.Since(modTime) > p.config.IgnoreOlder { + return true + } + + return false +} diff --git a/filebeat/prospector/prospector_stdin.go b/filebeat/prospector/prospector_stdin.go index 26a97438240..c8fe696293c 100644 --- a/filebeat/prospector/prospector_stdin.go +++ b/filebeat/prospector/prospector_stdin.go @@ -8,16 +8,15 @@ import ( ) type ProspectorStdin struct { - Prospector *Prospector - harvester *harvester.Harvester - started bool + harvester *harvester.Harvester + started bool } +// NewProspectorStdin creates a new stdin prospector +// This prospector contains one harvester which is reading from stdin func NewProspectorStdin(p *Prospector) (*ProspectorStdin, error) { - prospectorer := &ProspectorStdin{ - Prospector: p, - } + prospectorer := &ProspectorStdin{} var err error diff --git a/filebeat/beater/publish.go b/filebeat/publish/publish.go similarity index 98% rename from filebeat/beater/publish.go rename to filebeat/publish/publish.go index 7fadc16d24f..f0f009b424b 100644 --- a/filebeat/beater/publish.go +++ b/filebeat/publish/publish.go @@ -1,4 +1,4 @@ -package beater +package publish import ( "sync" @@ -11,7 +11,7 @@ import ( "github.com/elastic/beats/libbeat/publisher" ) -type logPublisher interface { +type LogPublisher interface { Start() Stop() } @@ -61,11 +61,11 @@ const ( batchCanceled ) -func newPublisher( +func New( async bool, in, out chan []*input.FileEvent, client publisher.Client, -) logPublisher { +) LogPublisher { if async { return newAsyncLogPublisher(in, out, client) } diff --git a/filebeat/beater/publish_test.go b/filebeat/publish/publish_test.go similarity index 95% rename from filebeat/beater/publish_test.go rename to filebeat/publish/publish_test.go index 8a0ab6c8dee..f0ad101ec9c 100644 --- a/filebeat/beater/publish_test.go +++ b/filebeat/publish/publish_test.go @@ -1,6 +1,6 @@ // +build !integration -package beater +package publish import ( "fmt" @@ -47,7 +47,7 @@ func TestPublisherModes(t *testing.T) { regChan := make(chan []*input.FileEvent, len(test.order)+1) client := pubtest.NewChanClient(0) - pub := newPublisher(test.async, pubChan, regChan, client) + pub := New(test.async, pubChan, regChan, client) pub.Start() var events [][]*input.FileEvent diff --git a/filebeat/crawler/registrar.go b/filebeat/registrar/registrar.go similarity index 86% rename from filebeat/crawler/registrar.go rename to filebeat/registrar/registrar.go index f7f5ebb4195..f41e47d9496 100644 --- a/filebeat/crawler/registrar.go +++ b/filebeat/registrar/registrar.go @@ -1,4 +1,4 @@ -package crawler +package registrar import ( "encoding/json" @@ -20,16 +20,16 @@ type Registrar struct { Channel chan []*FileEvent done chan struct{} registryFile string // Path to the Registry File - state *input.States // Map with all file paths inside and the corresponding state + states *input.States // Map with all file paths inside and the corresponding state wg sync.WaitGroup } -func NewRegistrar(registryFile string) (*Registrar, error) { +func New(registryFile string) (*Registrar, error) { r := &Registrar{ registryFile: registryFile, done: make(chan struct{}), - state: input.NewStates(), + states: input.NewStates(), Channel: make(chan []*FileEvent, 1), wg: sync.WaitGroup{}, } @@ -62,9 +62,14 @@ func (r *Registrar) Init() error { return nil } -// loadState fetches the previous reading state from the configure RegistryFile file +// GetStates return the registrar states +func (r *Registrar) GetStates() input.States { + return *r.states +} + +// loadStates fetches the previous reading state from the configure RegistryFile file // The default file is `registry` in the data path. -func (r *Registrar) LoadState() error { +func (r *Registrar) loadStates() error { // Check if files exists _, err := os.Stat(r.registryFile) @@ -97,7 +102,7 @@ func (r *Registrar) LoadState() error { states := []input.FileState{} decoder.Decode(&states) - r.state.SetStates(states) + r.states.SetStates(states) logp.Info("States Loaded from registrar: %+v", len(states)) return nil @@ -134,7 +139,7 @@ func (r *Registrar) loadAndConvertOldState(file *os.File) bool { counter++ } - r.state.SetStates(states) + r.states.SetStates(states) // Rewrite registry in new format r.writeRegistry() @@ -144,9 +149,19 @@ func (r *Registrar) loadAndConvertOldState(file *os.File) bool { return true } -func (r *Registrar) Start() { +func (r *Registrar) Start() error { + + // Load the previous log file locations now, for use in prospector + err := r.loadStates() + if err != nil { + logp.Err("Error loading state: %v", err) + return err + } + r.wg.Add(1) go r.Run() + + return nil } func (r *Registrar) Run() { @@ -183,7 +198,7 @@ func (r *Registrar) processEventStates(events []*FileEvent) { if event.InputType == cfg.StdinInputType { continue } - r.state.Update(event.FileState) + r.states.Update(event.FileState) } } @@ -205,7 +220,7 @@ func (r *Registrar) writeRegistry() error { return e } - states := r.state.GetStates() + states := r.states.GetStates() encoder := json.NewEncoder(file) encoder.Encode(states) diff --git a/filebeat/beater/spooler.go b/filebeat/spooler/spooler.go similarity index 97% rename from filebeat/beater/spooler.go rename to filebeat/spooler/spooler.go index b050dfe4fb8..0a309dcc5bf 100644 --- a/filebeat/beater/spooler.go +++ b/filebeat/spooler/spooler.go @@ -1,4 +1,4 @@ -package beater +package spooler import ( "sync" @@ -29,12 +29,12 @@ type Spooler struct { wg sync.WaitGroup // WaitGroup used to control the shutdown. } -// NewSpooler creates and returns a new Spooler. The returned Spooler must be +// New creates and returns a new Spooler. The returned Spooler must be // started by calling Start before it can be used. -func NewSpooler( +func New( config cfg.FilebeatConfig, publisher chan<- []*input.FileEvent, -) *Spooler { +) (*Spooler, error) { spoolSize := config.SpoolSize if spoolSize <= 0 { spoolSize = cfg.DefaultSpoolSize @@ -55,7 +55,7 @@ func NewSpooler( nextFlushTime: time.Now().Add(idleTimeout), publisher: publisher, spool: make([]*input.FileEvent, 0, spoolSize), - } + }, nil } // Start starts the Spooler. Stop must be called to stop the Spooler. diff --git a/filebeat/beater/spooler_test.go b/filebeat/spooler/spooler_test.go similarity index 84% rename from filebeat/beater/spooler_test.go rename to filebeat/spooler/spooler_test.go index 5c5f9159552..a4895d59923 100644 --- a/filebeat/beater/spooler_test.go +++ b/filebeat/spooler/spooler_test.go @@ -1,6 +1,6 @@ // +build !integration -package beater +package spooler import ( "testing" @@ -30,8 +30,9 @@ func TestNewSpoolerDefaultConfig(t *testing.T) { config := load(t, "") // Read from empty yaml config - spooler := NewSpooler(config, nil) + spooler, err := New(config, nil) + assert.NoError(t, err) assert.Equal(t, cfg.DefaultSpoolSize, spooler.spoolSize) assert.Equal(t, cfg.DefaultIdleTimeout, spooler.idleTimeout) } @@ -39,14 +40,16 @@ func TestNewSpoolerDefaultConfig(t *testing.T) { func TestNewSpoolerSpoolSize(t *testing.T) { spoolSize := uint64(19) config := cfg.FilebeatConfig{SpoolSize: spoolSize} - spooler := NewSpooler(config, nil) + spooler, err := New(config, nil) + assert.NoError(t, err) assert.Equal(t, spoolSize, spooler.spoolSize) } func TestNewSpoolerIdleTimeout(t *testing.T) { config := load(t, "idle_timeout: 10s") - spooler := NewSpooler(config, nil) + spooler, err := New(config, nil) + assert.NoError(t, err) assert.Equal(t, time.Duration(10*time.Second), spooler.idleTimeout) }