Skip to content

Commit

Permalink
Fix race condition for harvester Start / Stop in registry
Browse files Browse the repository at this point in the history
There is still a case where a race condition could happen (see TODO). Registry must be improved to also handle this case.

This should not be backported as the issue can only happen on shutdown in very rare cases and should not have any side effect.

Closes #3779
  • Loading branch information
ruflin committed Apr 28, 2017
1 parent 6b01fa9 commit 8518b4d
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 7 deletions.
4 changes: 2 additions & 2 deletions filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Harvester struct {
encoding encoding.Encoding
done chan struct{}
stopOnce sync.Once
stopWg *sync.WaitGroup
StopWg *sync.WaitGroup
outlet Outlet
ID uuid.UUID
processors *processors.Processors
Expand All @@ -71,7 +71,7 @@ func NewHarvester(
state: state,
states: states,
done: make(chan struct{}),
stopWg: &sync.WaitGroup{},
StopWg: &sync.WaitGroup{},
outlet: outlet,
ID: uuid.NewV4(),
}
Expand Down
5 changes: 2 additions & 3 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func (h *Harvester) Harvest(r reader.Reader) {
harvesterStarted.Add(1)
harvesterRunning.Add(1)

h.stopWg.Add(1)
defer func() {
// Channel to stop internal harvester routines
h.stop()
Expand All @@ -64,7 +63,7 @@ func (h *Harvester) Harvest(r reader.Reader) {
harvesterRunning.Add(-1)

// Marks harvester stopping completed
h.stopWg.Done()
h.StopWg.Done()
}()

// Closes reader after timeout or when done channel is closed
Expand Down Expand Up @@ -164,7 +163,7 @@ func (h *Harvester) stop() {
// Stop stops harvester and waits for completion
func (h *Harvester) Stop() {
h.stop()
h.stopWg.Wait()
h.StopWg.Wait()
}

// sendEvent sends event to the spooler channel
Expand Down
27 changes: 25 additions & 2 deletions filebeat/prospector/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ type harvesterRegistry struct {
sync.Mutex
harvesters map[uuid.UUID]*harvester.Harvester
wg sync.WaitGroup
done chan struct{}
}

func newHarvesterRegistry() *harvesterRegistry {
return &harvesterRegistry{
harvesters: map[uuid.UUID]*harvester.Harvester{},
done: make(chan struct{}),
}
}

Expand All @@ -33,16 +35,21 @@ func (hr *harvesterRegistry) remove(h *harvester.Harvester) {
}

func (hr *harvesterRegistry) Stop() {

hr.Lock()
defer func() {
hr.Unlock()
hr.waitForCompletion()
}()
close(hr.done)
for _, hv := range hr.harvesters {
hr.wg.Add(1)
go func(h *harvester.Harvester) {
hr.wg.Done()
h.Stop()
}(hv)
}
hr.Unlock()
hr.waitForCompletion()

}

func (hr *harvesterRegistry) waitForCompletion() {
Expand All @@ -51,7 +58,23 @@ func (hr *harvesterRegistry) waitForCompletion() {

func (hr *harvesterRegistry) start(h *harvester.Harvester, r reader.Reader) {

// Make sure stop is not called during starting a harvester
hr.Lock()

// Make sure no new harvesters are started after stop was called
select {
case <-hr.done:
return
default:
}

hr.wg.Add(1)
h.StopWg.Add(1)

hr.Unlock()

// TODO: It could happen that stop is called here

hr.add(h)

// Update state before staring harvester
Expand Down

0 comments on commit 8518b4d

Please sign in to comment.