diff --git a/beat/filebeat.go b/beat/filebeat.go index 78002eedc84..a480d0d1efd 100644 --- a/beat/filebeat.go +++ b/beat/filebeat.go @@ -116,7 +116,7 @@ func (fb *Filebeat) Stop() { fb.registrar.Stop() // Close channels - close(fb.publisherChan) + //close(fb.publisherChan) } func Publish(beat *beat.Beat, fb *Filebeat) { diff --git a/beat/spooler.go b/beat/spooler.go index 4817267fa82..3985d5d4461 100644 --- a/beat/spooler.go +++ b/beat/spooler.go @@ -103,7 +103,6 @@ func (s *Spooler) Run() { // Stop stops the spooler. Flushes events before stopping func (s *Spooler) Stop() { - s.running = false } // flush flushes all event and sends them to the publisher diff --git a/crawler/crawler.go b/crawler/crawler.go index 086e9b51e3f..260669475d7 100644 --- a/crawler/crawler.go +++ b/crawler/crawler.go @@ -79,5 +79,4 @@ func (crawler *Crawler) Start(files []config.ProspectorConfig, eventChan chan *i func (crawler *Crawler) Stop() { // TODO: Properly stop prospectors and harvesters - crawler.running = false } diff --git a/crawler/prospector.go b/crawler/prospector.go index 5c69bdc345f..0d63fc62e74 100644 --- a/crawler/prospector.go +++ b/crawler/prospector.go @@ -325,7 +325,6 @@ func (p *Prospector) checkExistingFile(newinfo *ProspectorFileStat, newFile *inp func (p *Prospector) Stop() { // TODO: Stopping is currently not implemented - p.running = false } // Check if the given file was renamed. If file is known but with different path, diff --git a/crawler/registrar.go b/crawler/registrar.go index 0aae5bc985d..7f4a373d844 100644 --- a/crawler/registrar.go +++ b/crawler/registrar.go @@ -20,13 +20,16 @@ type Registrar struct { // Channel used by the prospector and crawler to send FileStates to be persisted Persist chan *input.FileState running bool + Channel chan []*FileEvent + done chan struct{} } func NewRegistrar(registryFile string) (*Registrar, error) { r := &Registrar{ registryFile: registryFile, + done: make(chan struct{}), } err := r.Init() @@ -81,14 +84,22 @@ func (r *Registrar) Run() { r.running = true - // Writes registry + // Writes registry on shutdown defer r.writeRegistry() - for events := range r.Channel { + for { + var events []*FileEvent + select { + case <-r.done: + logp.Debug("registrar", "Ending Registrar") + return + case events = <-r.Channel: + } + logp.Debug("registrar", "Registrar: processing %d events", len(events)) + // Take the last event found for each file source for _, event := range events { - if !r.running { break } @@ -105,17 +116,13 @@ func (r *Registrar) Run() { // REVU: but we should panic, or something, right? logp.Err("Update of registry returned error: %v. Continuing..", e) } - - if !r.running { - break - } } - logp.Debug("registrar", "Ending Registrar") } func (r *Registrar) Stop() { r.running = false - close(r.Channel) + close(r.done) + // Note: don't block using waitGroup, cause this method is run by async signal handler } func (r *Registrar) GetFileState(path string) (*FileState, bool) {