From 8518b4d0a8abd7952d290633cd155e6a10f90187 Mon Sep 17 00:00:00 2001 From: ruflin Date: Tue, 21 Mar 2017 10:27:55 +0100 Subject: [PATCH] Fix race condition for harvester Start / Stop in registry There is still a case where a race condition could happen (see TODO). Registry must be improved to also handle this case. This should not be backported as the issue can only happen on shutdown in very rare cases and should not have any side effect. Closes https://github.com/elastic/beats/issues/3779 --- filebeat/harvester/harvester.go | 4 ++-- filebeat/harvester/log.go | 5 ++--- filebeat/prospector/registry.go | 27 +++++++++++++++++++++++++-- 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/filebeat/harvester/harvester.go b/filebeat/harvester/harvester.go index fcc64ec1538c..a3647d037199 100644 --- a/filebeat/harvester/harvester.go +++ b/filebeat/harvester/harvester.go @@ -53,7 +53,7 @@ type Harvester struct { encoding encoding.Encoding done chan struct{} stopOnce sync.Once - stopWg *sync.WaitGroup + StopWg *sync.WaitGroup outlet Outlet ID uuid.UUID processors *processors.Processors @@ -71,7 +71,7 @@ func NewHarvester( state: state, states: states, done: make(chan struct{}), - stopWg: &sync.WaitGroup{}, + StopWg: &sync.WaitGroup{}, outlet: outlet, ID: uuid.NewV4(), } diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index d4bfe145498f..d786c9fbf7a4 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -54,7 +54,6 @@ func (h *Harvester) Harvest(r reader.Reader) { harvesterStarted.Add(1) harvesterRunning.Add(1) - h.stopWg.Add(1) defer func() { // Channel to stop internal harvester routines h.stop() @@ -64,7 +63,7 @@ func (h *Harvester) Harvest(r reader.Reader) { harvesterRunning.Add(-1) // Marks harvester stopping completed - h.stopWg.Done() + h.StopWg.Done() }() // Closes reader after timeout or when done channel is closed @@ -164,7 +163,7 @@ func (h *Harvester) stop() { // Stop stops harvester and waits for completion func (h *Harvester) Stop() { h.stop() - h.stopWg.Wait() + h.StopWg.Wait() } // sendEvent sends event to the spooler channel diff --git a/filebeat/prospector/registry.go b/filebeat/prospector/registry.go index 0f600f5d9f86..4f7ac00a6c0e 100644 --- a/filebeat/prospector/registry.go +++ b/filebeat/prospector/registry.go @@ -12,11 +12,13 @@ type harvesterRegistry struct { sync.Mutex harvesters map[uuid.UUID]*harvester.Harvester wg sync.WaitGroup + done chan struct{} } func newHarvesterRegistry() *harvesterRegistry { return &harvesterRegistry{ harvesters: map[uuid.UUID]*harvester.Harvester{}, + done: make(chan struct{}), } } @@ -33,7 +35,13 @@ func (hr *harvesterRegistry) remove(h *harvester.Harvester) { } func (hr *harvesterRegistry) Stop() { + hr.Lock() + defer func() { + hr.Unlock() + hr.waitForCompletion() + }() + close(hr.done) for _, hv := range hr.harvesters { hr.wg.Add(1) go func(h *harvester.Harvester) { @@ -41,8 +49,7 @@ func (hr *harvesterRegistry) Stop() { h.Stop() }(hv) } - hr.Unlock() - hr.waitForCompletion() + } func (hr *harvesterRegistry) waitForCompletion() { @@ -51,7 +58,23 @@ func (hr *harvesterRegistry) waitForCompletion() { func (hr *harvesterRegistry) start(h *harvester.Harvester, r reader.Reader) { + // Make sure stop is not called during starting a harvester + hr.Lock() + + // Make sure no new harvesters are started after stop was called + select { + case <-hr.done: + return + default: + } + hr.wg.Add(1) + h.StopWg.Add(1) + + hr.Unlock() + + // TODO: It could happen that stop is called here + hr.add(h) // Update state before staring harvester