Skip to content

Commit

Permalink
Merge pull request #116 from urso/fix/panic-shutdown
Browse files Browse the repository at this point in the history
Fix/panic shutdown (temporary fix)
  • Loading branch information
ruflin committed Oct 20, 2015
2 parents 40d2e3c + cca7a25 commit 7054fd3
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 13 deletions.
2 changes: 1 addition & 1 deletion beat/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (fb *Filebeat) Stop() {
fb.registrar.Stop()

// Close channels
close(fb.publisherChan)
//close(fb.publisherChan)
}

func Publish(beat *beat.Beat, fb *Filebeat) {
Expand Down
1 change: 0 additions & 1 deletion beat/spooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func (s *Spooler) Run() {

// Stop stops the spooler. Flushes events before stopping
func (s *Spooler) Stop() {
s.running = false
}

// flush flushes all event and sends them to the publisher
Expand Down
1 change: 0 additions & 1 deletion crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,4 @@ func (crawler *Crawler) Start(files []config.ProspectorConfig, eventChan chan *i

func (crawler *Crawler) Stop() {
// TODO: Properly stop prospectors and harvesters
crawler.running = false
}
1 change: 0 additions & 1 deletion crawler/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ func (p *Prospector) checkExistingFile(newinfo *ProspectorFileStat, newFile *inp

func (p *Prospector) Stop() {
// TODO: Stopping is currently not implemented
p.running = false
}

// Check if the given file was renamed. If file is known but with different path,
Expand Down
25 changes: 16 additions & 9 deletions crawler/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ type Registrar struct {
// Channel used by the prospector and crawler to send FileStates to be persisted
Persist chan *input.FileState
running bool

Channel chan []*FileEvent
done chan struct{}
}

func NewRegistrar(registryFile string) (*Registrar, error) {

r := &Registrar{
registryFile: registryFile,
done: make(chan struct{}),
}
err := r.Init()

Expand Down Expand Up @@ -81,14 +84,22 @@ func (r *Registrar) Run() {

r.running = true

// Writes registry
// Writes registry on shutdown
defer r.writeRegistry()

for events := range r.Channel {
for {
var events []*FileEvent
select {
case <-r.done:
logp.Debug("registrar", "Ending Registrar")
return
case events = <-r.Channel:
}

logp.Debug("registrar", "Registrar: processing %d events", len(events))

// Take the last event found for each file source
for _, event := range events {

if !r.running {
break
}
Expand All @@ -105,17 +116,13 @@ func (r *Registrar) Run() {
// REVU: but we should panic, or something, right?
logp.Err("Update of registry returned error: %v. Continuing..", e)
}

if !r.running {
break
}
}
logp.Debug("registrar", "Ending Registrar")
}

func (r *Registrar) Stop() {
r.running = false
close(r.Channel)
close(r.done)
// Note: don't block using waitGroup, cause this method is run by async signal handler
}

func (r *Registrar) GetFileState(path string) (*FileState, bool) {
Expand Down

0 comments on commit 7054fd3

Please sign in to comment.