Skip to content

Commit

Permalink
Merge pull request #1075 from McStork/libbeat-proper-shut
Browse files Browse the repository at this point in the history
Ensure proper shutdown of libbeat
  • Loading branch information
Steffen Siering committed Mar 4, 2016
2 parents 34cbe9d + 7f16e11 commit 6d6cd52
Show file tree
Hide file tree
Showing 24 changed files with 302 additions and 146 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff]
- Add experimental Kafka output. {pull}942[942]
- Add config file option to configure GOMAXPROCS. {pull}969[969]
- Added a `fields` and `fields_under_root` as options available under the `shipper` configuration {pull}1092[1092]
- Ensure proper shutdown of libbeat. {pull}1075[1075]

*Packetbeat*
- Change the DNS library used throughout the dns package to github.com/miekg/dns. {pull}803[803]
Expand Down
45 changes: 45 additions & 0 deletions libbeat/common/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package common

import (
"sync"
)

// WorkerSignal ensure all events have been
// treated before closing Go routines
type WorkerSignal struct {
Done chan struct{}
wgEvent sync.WaitGroup
wgWorker sync.WaitGroup
}

func NewWorkerSignal() *WorkerSignal {
w := &WorkerSignal{}
w.Init()
return w
}

func (ws *WorkerSignal) Init() {
ws.Done = make(chan struct{})
}

func (ws *WorkerSignal) AddEvent(delta int) {
ws.wgEvent.Add(delta)
}

func (ws *WorkerSignal) DoneEvent() {
ws.wgEvent.Done()
}

func (ws *WorkerSignal) WorkerStart() {
ws.wgWorker.Add(1)
}

func (ws *WorkerSignal) WorkerFinished() {
ws.wgWorker.Done()
}

func (ws *WorkerSignal) Stop() {
ws.wgEvent.Wait() // Wait for all events to be dealt with
close(ws.Done) // Ask Go routines to exit
ws.wgWorker.Wait() // Wait for Go routines to finish
}
5 changes: 5 additions & 0 deletions libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func writeBuffer(buf []byte) error {
return nil
}

// Implement Outputer
func (c *console) Close() error {
return nil
}

func (c *console) PublishEvent(
s outputs.Signaler,
opts outputs.Options,
Expand Down
4 changes: 4 additions & 0 deletions libbeat/outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ func makeClientFactory(
}
}

func (out *elasticsearchOutput) Close() error {
return out.mode.Close()
}

func (out *elasticsearchOutput) PublishEvent(
signaler outputs.Signaler,
opts outputs.Options,
Expand Down
5 changes: 5 additions & 0 deletions libbeat/outputs/fileout/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ func (out *fileOutput) init(config config) error {
return nil
}

// Implement Outputer
func (out *fileOutput) Close() error {
return nil
}

func (out *fileOutput) PublishEvent(
trans outputs.Signaler,
opts outputs.Options,
Expand Down
4 changes: 4 additions & 0 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func (k *kafka) init(cfg *ucfg.Config) error {
return nil
}

func (k *kafka) Close() error {
return k.mode.Close()
}

func (k *kafka) PublishEvent(
signal outputs.Signaler,
opts outputs.Options,
Expand Down
4 changes: 4 additions & 0 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ func makeTLSClient(port int, tls *tls.Config) func(string) (TransportClient, err
}
}

func (lj *logstash) Close() error {
return lj.mode.Close()
}

// TODO: update Outputer interface to support multiple events for batch-like
// processing (e.g. for filebeat). Batch like processing might reduce
// send/receive overhead per event for other implementors too.
Expand Down
38 changes: 18 additions & 20 deletions libbeat/outputs/mode/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ type LoadBalancerMode struct {
// block until event has been successfully published.
maxAttempts int

// waitGroup + signaling channel for handling shutdown
wg sync.WaitGroup
done chan struct{}
// WorkerSignal for handling events and a clean shutdown
ws common.WorkerSignal

// channels for forwarding work items to workers.
// The work channel is used by publisher to insert new events
Expand Down Expand Up @@ -83,18 +82,16 @@ func NewLoadBalancerMode(

work: make(chan eventsMessage),
retries: make(chan eventsMessage, len(clients)*2),
done: make(chan struct{}),
}
m.ws.Init()
m.start(clients)

return m, nil
}

// Close stops all workers and closes all open connections. In flight events
// are signaled as failed.
// Close waits for the workers to end and connections to close.
func (m *LoadBalancerMode) Close() error {
close(m.done)
m.wg.Wait()
m.ws.Stop()
return nil
}

Expand Down Expand Up @@ -128,8 +125,9 @@ func (m *LoadBalancerMode) publishEventsMessage(
}
msg.attemptsLeft = maxAttempts

m.ws.AddEvent(1)
if ok := m.forwardEvent(m.work, msg); !ok {
dropping(msg)
dropping(msg, &m.ws)
}
return nil
}
Expand All @@ -141,15 +139,15 @@ func (m *LoadBalancerMode) start(clients []ProtocolClient) {
if client.IsConnected() {
_ = client.Close()
}
m.wg.Done()
m.ws.WorkerFinished()
}()

waitStart.Done()
m.clientLoop(client)
}

for _, client := range clients {
m.wg.Add(1)
m.ws.WorkerStart()
waitStart.Add(1)
go worker(client)
}
Expand All @@ -160,7 +158,7 @@ func (m *LoadBalancerMode) clientLoop(client ProtocolClient) {
debug("load balancer: start client loop")
defer debug("load balancer: stop client loop")

backoff := newBackoff(m.done, m.waitRetry, m.maxWaitRetry)
backoff := newBackoff(m.ws.Done, m.waitRetry, m.maxWaitRetry)

done := false
for !done {
Expand Down Expand Up @@ -190,7 +188,7 @@ func (m *LoadBalancerMode) sendLoop(client ProtocolClient, backoff *backoff) boo
for {
var msg eventsMessage
select {
case <-m.done:
case <-m.ws.Done:
return true
case msg = <-m.retries: // receive message from other failed worker
case msg = <-m.work: // receive message from publisher
Expand All @@ -208,7 +206,6 @@ func (m *LoadBalancerMode) onMessage(
client ProtocolClient,
msg eventsMessage,
) (bool, error) {

done := false
if msg.event != nil {
err := client.PublishEvent(msg.event)
Expand Down Expand Up @@ -254,7 +251,7 @@ func (m *LoadBalancerMode) onMessage(

if m.maxAttempts > 0 && msg.attemptsLeft == 0 {
// no more attempts left => drop
dropping(msg)
dropping(msg, &m.ws)
return done, err
}

Expand All @@ -265,15 +262,15 @@ func (m *LoadBalancerMode) onMessage(
}

outputs.SignalCompleted(msg.signaler)
m.ws.DoneEvent()
return done, nil
}

func (m *LoadBalancerMode) onFail(msg eventsMessage, err error) {

logp.Info("Error publishing events (retrying): %s", err)

if !m.forwardEvent(m.retries, msg) {
dropping(msg)
dropping(msg, &m.ws)
}
}

Expand All @@ -285,15 +282,15 @@ func (m *LoadBalancerMode) forwardEvent(
select {
case ch <- msg:
return true
case <-m.done: // shutdown
case <-m.ws.Done: // shutdown
return false
}
} else {
for ; msg.attemptsLeft > 0; msg.attemptsLeft-- {
select {
case ch <- msg:
return true
case <-m.done: // shutdown
case <-m.ws.Done: // shutdown
return false
case <-time.After(m.timeout):
}
Expand All @@ -304,8 +301,9 @@ func (m *LoadBalancerMode) forwardEvent(

// dropping is called when a message is dropped. It updates the
// relevant counters and sends a failed signal.
func dropping(msg eventsMessage) {
func dropping(msg eventsMessage, ws *common.WorkerSignal) {
debug("messages dropped")
messagesDropped.Add(1)
outputs.SignalFailed(msg.signaler, nil)
ws.DoneEvent()
}
33 changes: 17 additions & 16 deletions libbeat/outputs/mode/balance_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ type AsyncLoadBalancerMode struct {
// block until event has been successfully published.
maxAttempts int

// waitGroup + signaling channel for handling shutdown
wg sync.WaitGroup
done chan struct{}
// WorkerSignal for handling events and a clean shutdown
ws common.WorkerSignal

// channels for forwarding work items to workers.
// The work channel is used by publisher to insert new events
Expand Down Expand Up @@ -80,8 +79,8 @@ func NewAsyncLoadBalancerMode(

work: make(chan eventsMessage),
retries: make(chan eventsMessage, len(clients)*2),
done: make(chan struct{}),
}
m.ws.Init()
m.start(clients)

return m, nil
Expand All @@ -90,8 +89,7 @@ func NewAsyncLoadBalancerMode(
// Close stops all workers and closes all open connections. In flight events
// are signaled as failed.
func (m *AsyncLoadBalancerMode) Close() error {
close(m.done)
m.wg.Wait()
m.ws.Stop()
return nil
}

Expand Down Expand Up @@ -129,8 +127,9 @@ func (m *AsyncLoadBalancerMode) publishEventsMessage(
msg.attemptsLeft = maxAttempts
debug("publish events with attempts=%v", msg.attemptsLeft)

m.ws.AddEvent(1)
if ok := m.forwardEvent(m.work, msg); !ok {
dropping(msg)
dropping(msg, &m.ws)
}
return nil
}
Expand All @@ -142,12 +141,12 @@ func (m *AsyncLoadBalancerMode) start(clients []AsyncProtocolClient) {
if client.IsConnected() {
_ = client.Close()
}
m.wg.Done()
m.ws.WorkerFinished()
}()

waitStart.Done()

backoff := newBackoff(m.done, m.waitRetry, m.maxWaitRetry)
backoff := newBackoff(m.ws.Done, m.waitRetry, m.maxWaitRetry)
for {
// reconnect loop
for !client.IsConnected() {
Expand All @@ -163,7 +162,7 @@ func (m *AsyncLoadBalancerMode) start(clients []AsyncProtocolClient) {
// receive and process messages
var msg eventsMessage
select {
case <-m.done:
case <-m.ws.Done:
return
case msg = <-m.retries: // receive message from other failed worker
debug("events from retries queue")
Expand All @@ -179,7 +178,7 @@ func (m *AsyncLoadBalancerMode) start(clients []AsyncProtocolClient) {
}

for _, client := range clients {
m.wg.Add(1)
m.ws.WorkerStart()
waitStart.Add(1)
go worker(client)
}
Expand Down Expand Up @@ -220,6 +219,7 @@ func handlePublishEventResult(m *AsyncLoadBalancerMode, msg eventsMessage) func(
m.onFail(false, msg, err)
} else {
outputs.SignalCompleted(msg.signaler)
m.ws.DoneEvent()
}
}
}
Expand Down Expand Up @@ -253,7 +253,7 @@ func handlePublishEventsResult(

if m.maxAttempts > 0 && msg.attemptsLeft == 0 {
// no more attempts left => drop
dropping(msg)
dropping(msg, &m.ws)
return
}

Expand All @@ -268,14 +268,15 @@ func handlePublishEventsResult(
debug("add non-published events back into pipeline: %v", len(events))
msg.events = events
if ok := m.forwardEvent(m.retries, msg); !ok {
dropping(msg)
dropping(msg, &m.ws)
}
return
}

// all events published -> signal success
debug("async bulk publish success")
outputs.SignalCompleted(msg.signaler)
m.ws.DoneEvent()
}
}

Expand All @@ -284,7 +285,7 @@ func (m *AsyncLoadBalancerMode) onFail(async bool, msg eventsMessage, err error)
logp.Info("Error publishing events (retrying): %s", err)

if ok := m.forwardEvent(m.retries, msg); !ok {
dropping(msg)
dropping(msg, &m.ws)
}
}

Expand All @@ -306,7 +307,7 @@ func (m *AsyncLoadBalancerMode) forwardEvent(
case ch <- msg:
debug("message forwarded")
return true
case <-m.done: // shutdown
case <-m.ws.Done: // shutdown
debug("shutting down")
return false
}
Expand All @@ -316,7 +317,7 @@ func (m *AsyncLoadBalancerMode) forwardEvent(
case ch <- msg:
debug("message forwarded")
return true
case <-m.done: // shutdown
case <-m.ws.Done: // shutdown
debug("shutting down")
return false
case <-time.After(m.timeout):
Expand Down
Loading

0 comments on commit 6d6cd52

Please sign in to comment.