Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for closing publisher.Client #1402

Merged
merged 14 commits into from
Apr 20, 2016
10 changes: 7 additions & 3 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Filebeat struct {
spooler *Spooler
registrar *crawler.Registrar
crawler *crawler.Crawler
pub logPublisher
done chan struct{}
}

Expand Down Expand Up @@ -92,9 +93,9 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}

// Publishes event to output
pub := newPublisher(fb.FbConfig.Filebeat.PublishAsync,
fb.publisherChan, fb.registrar.Channel, b.Events)
pub.Start()
fb.pub = newPublisher(fb.FbConfig.Filebeat.PublishAsync,
fb.publisherChan, fb.registrar.Channel, b.Publisher.Connect())
fb.pub.Start()

// Blocks progressing
select {
Expand All @@ -119,6 +120,9 @@ func (fb *Filebeat) Stop() {
// Stopping spooler will flush items
fb.spooler.Stop()

// stopping publisher (might potentially drop items)
fb.pub.Stop()

// Stopping registrar will write last state
fb.registrar.Stop()

Expand Down
52 changes: 38 additions & 14 deletions filebeat/beater/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type asyncLogPublisher struct {
in, out chan []*input.FileEvent

// list of in-flight batches
active batchList
failing bool
active batchList
stopping bool

done chan struct{}
wg sync.WaitGroup
Expand All @@ -48,13 +48,17 @@ type batchList struct {
head, tail *eventsBatch
}

type batchStatus int32

const (
defaultGCTimeout = 1 * time.Second
)

const (
batchSuccess int32 = 1
batchFailed int32 = 2
batchInProgress batchStatus = iota
batchSuccess
batchFailed
batchCanceled
)

func newPublisher(
Expand Down Expand Up @@ -115,6 +119,7 @@ func (p *syncLogPublisher) Start() {

func (p *syncLogPublisher) Stop() {
close(p.done)
p.client.Close()
p.wg.Wait()
}

Expand Down Expand Up @@ -169,6 +174,7 @@ func (p *asyncLogPublisher) Start() {

func (p *asyncLogPublisher) Stop() {
close(p.done)
p.client.Close()
p.wg.Wait()
}

Expand All @@ -177,33 +183,46 @@ func (p *asyncLogPublisher) Stop() {
// as bulk-Events have been received by the spooler
func (p *asyncLogPublisher) collect() bool {
for batch := p.active.head; batch != nil; batch = batch.next {
state := atomic.LoadInt32(&batch.flag)
if state == 0 && !p.failing {
state := batchStatus(atomic.LoadInt32(&batch.flag))
if state == batchInProgress && !p.stopping {
break
}

if state == batchFailed {
// with guaranteed enabled this must must not happen.
msg := "Failed to process batch"
logp.Critical(msg)
panic(msg)
}

// remove batch from active list
p.active.head = batch.next
if batch.next == nil {
p.active.tail = nil
}

// Batches get marked as failed, if publisher pipeline is shutting down
// Batches get marked as canceled, if publisher pipeline is shutting down
// In this case we do not want to send any more batches to the registrar
if state == batchFailed {
p.failing = true
if state == batchCanceled {
p.stopping = true
}

if p.failing {
logp.Warn("No registrar update for potentially published batch.")
if p.stopping {
logp.Info("Shutting down - No registrar update for potentially published batch.")

// if in failing state keep cleaning up queue
continue
}

// Tell the registrar that we've successfully sent these events
// Tell the registrar that we've successfully publish the last batch events.
// If registrar queue is blocking (quite unlikely), but stop signal has been
// received in the meantime (by closing p.done), we do not wait for
// registrar picking up the current batch. Instead prefer to shut-down and
// resend the last published batch on next restart, basically taking advantage
// of send-at-last-once semantics in order to speed up cleanup on shutdown.
select {
case <-p.done:
logp.Info("Shutting down - No registrar update for successfully published batch.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment 2 lines above says differently :-) Why don't we update the registrar here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're forwarding registrar updates via queue. If queue is blocking, but stop signal has been send (p.done being closed), we do not wait for registrar to finish writing, but instead prefer to shut-down and resend the same line on next restart. This is old behavior, I just added info message notifying users on shutdown registrar will not be updated anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me, but we should probably update the comment.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

improved code comment

return false
case p.out <- batch.events:
}
Expand All @@ -212,12 +231,17 @@ func (p *asyncLogPublisher) collect() bool {
}

func (b *eventsBatch) Completed() {
atomic.StoreInt32(&b.flag, batchSuccess)
atomic.StoreInt32(&b.flag, int32(batchSuccess))
}

func (b *eventsBatch) Failed() {
logp.Err("Failed to publish batch. Stop updating registrar.")
atomic.StoreInt32(&b.flag, batchFailed)
atomic.StoreInt32(&b.flag, int32(batchFailed))
}

func (b *eventsBatch) Canceled() {
logp.Info("In-flight batch has been canceled during shutdown")
atomic.StoreInt32(&b.flag, int32(batchCanceled))
}

func (l *batchList) append(b *eventsBatch) {
Expand Down
10 changes: 5 additions & 5 deletions filebeat/beater/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"time"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/common/op"
pubtest "github.com/elastic/beats/libbeat/publisher/testing"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -45,7 +45,7 @@ func TestPublisherModes(t *testing.T) {

pubChan := make(chan []*input.FileEvent, len(test.order)+1)
regChan := make(chan []*input.FileEvent, len(test.order)+1)
client := publisher.ExtChanClient{make(chan publisher.PublishMessage)}
client := pubtest.NewChanClient(0)

pub := newPublisher(test.async, pubChan, regChan, client)
pub.Start()
Expand All @@ -57,14 +57,14 @@ func TestPublisherModes(t *testing.T) {
events = append(events, tmp)
}

var msgs []publisher.PublishMessage
var msgs []pubtest.PublishMessage
for _ = range test.order {
m := <-client.Channel
msgs = append(msgs, m)
}

for _, i := range test.order {
outputs.SignalCompleted(msgs[i-1].Context.Signal)
op.SigCompleted(msgs[i-1].Context.Signal)
}

var regEvents [][]*input.FileEvent
Expand Down
17 changes: 7 additions & 10 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,13 @@ type FlagsHandler interface {
// Beat contains the basic beat data and the publisher client used to publish
// events.
type Beat struct {
Name string // Beat name.
Version string // Beat version number. Defaults to the libbeat version when an implementation does not set a version.
UUID uuid.UUID // ID assigned to a Beat instance.
BT Beater // Beater implementation.
RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data.
Config BeatConfig // Common Beat configuration data.
Events publisher.Client // Client used for publishing events.
Publisher *publisher.PublisherType // Publisher
Name string // Beat name.
Version string // Beat version number. Defaults to the libbeat version when an implementation does not set a version.
UUID uuid.UUID // ID assigned to a Beat instance.
BT Beater // Beater implementation.
RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data.
Config BeatConfig // Common Beat configuration data.
Publisher *publisher.Publisher // Publisher

filters *filter.FilterList // Filters
}
Expand Down Expand Up @@ -234,8 +233,6 @@ func (bc *instance) setup() error {
}

bc.data.Publisher.RegisterFilter(bc.data.filters)
bc.data.Events = bc.data.Publisher.Client()

err = bc.beater.Setup(bc.data)
if err != nil {
return err
Expand Down
28 changes: 28 additions & 0 deletions libbeat/common/op/cancel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package op

import "sync"

type Canceler struct {
lock sync.RWMutex
done chan struct{}
active bool
}

func NewCanceler() *Canceler {
return &Canceler{
done: make(chan struct{}),
active: true,
}
}

func (c *Canceler) Cancel() {
c.lock.Lock()
c.active = false
c.lock.Unlock()

close(c.done)
}

func (c *Canceler) Done() <-chan struct{} {
return c.done
}
Loading