Skip to content

Commit

Permalink
Merge pull request #782 from urso/enh/filebeat-async-spooler
Browse files Browse the repository at this point in the history
Filebeat async publisher support
  • Loading branch information
tsg committed Jan 25, 2016
2 parents a1d2411 + a6c2dc1 commit 7a2aeeb
Show file tree
Hide file tree
Showing 17 changed files with 404 additions and 83 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff]
*Filebeat*
- Add multiline support for combining multiple related lines into one event. {issue}461[461]
- Add close_older configuration option to complete ignore_older https://github.com/elastic/filebeat/issues/181[181]
- Add experimental option to enable filebeat publisher pipeline to operate asynchonrously {pull}782[782]

*Winlogbeat*
- Added support for reading event logs using the Windows Event Log API {pull}576[576]
Expand Down
26 changes: 3 additions & 23 deletions filebeat/beat/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"

cfg "github.com/elastic/beats/filebeat/config"
. "github.com/elastic/beats/filebeat/crawler"
Expand Down Expand Up @@ -93,7 +91,9 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}

// Publishes event to output
go Publish(b, fb)
pub := newPublisher(fb.FbConfig.Filebeat.PublishAsync,
fb.publisherChan, fb.registrar.Channel, b.Events)
pub.Start()

// Blocks progressing
select {
Expand Down Expand Up @@ -123,23 +123,3 @@ func (fb *Filebeat) Stop() {
// Stop Filebeat
close(fb.done)
}

func Publish(beat *beat.Beat, fb *Filebeat) {
logp.Info("Start sending events to output")

// Receives events from spool during flush
for events := range fb.publisherChan {

pubEvents := make([]common.MapStr, 0, len(events))
for _, event := range events {
pubEvents = append(pubEvents, event.ToMapStr())
}

beat.Events.PublishEvents(pubEvents, publisher.Sync, publisher.Guaranteed)

logp.Info("Events sent: %d", len(events))

// Tell the registrar that we've successfully sent these events
fb.registrar.Channel <- events
}
}
223 changes: 223 additions & 0 deletions filebeat/beat/publish.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package beat

import (
"sync"
"sync/atomic"
"time"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
)

type logPublisher interface {
Start()
Stop()
}

type syncLogPublisher struct {
client publisher.Client
in, out chan []*input.FileEvent

done chan struct{}
wg sync.WaitGroup
}

type asyncLogPublisher struct {
client publisher.Client
in, out chan []*input.FileEvent

// list of in-flight batches
active batchList
failing bool

done chan struct{}
wg sync.WaitGroup
}

// eventsBatch is used to store sorted list of actively published log lines.
// Implements `outputs.Signalerr` interface for marking batch as finished
type eventsBatch struct {
next *eventsBatch
flag int32
events []*input.FileEvent
}

type batchList struct {
head, tail *eventsBatch
}

const (
defaultGCTimeout = 1 * time.Second
)

const (
batchSuccess int32 = 1
batchFailed int32 = 2
)

func newPublisher(
async bool,
in, out chan []*input.FileEvent,
client publisher.Client,
) logPublisher {
if async {
return newAsyncLogPublisher(in, out, client)
}
return newSyncLogPublisher(in, out, client)
}

func newSyncLogPublisher(
in, out chan []*input.FileEvent,
client publisher.Client,
) *syncLogPublisher {
return &syncLogPublisher{
in: in,
out: out,
client: client,
done: make(chan struct{}),
}
}

func (p *syncLogPublisher) Start() {
p.wg.Add(1)
go func() {
defer p.wg.Done()

logp.Info("Start sending events to output")

for {
var events []*input.FileEvent
select {
case <-p.done:
return
case events = <-p.in:
}

pubEvents := make([]common.MapStr, 0, len(events))
for _, event := range events {
pubEvents = append(pubEvents, event.ToMapStr())
}

p.client.PublishEvents(pubEvents, publisher.Sync, publisher.Guaranteed)
logp.Info("Events sent: %d", len(events))

// Tell the registrar that we've successfully sent these events
select {
case <-p.done:
return
case p.out <- events:
}
}
}()
}

func (p *syncLogPublisher) Stop() {
close(p.done)
p.wg.Wait()
}

func newAsyncLogPublisher(
in, out chan []*input.FileEvent,
client publisher.Client,
) *asyncLogPublisher {
return &asyncLogPublisher{
in: in,
out: out,
client: client,
done: make(chan struct{}),
}
}

func (p *asyncLogPublisher) Start() {
p.wg.Add(1)
go func() {
defer p.wg.Done()

logp.Info("Start sending events to output")

// short gc timer, in case no logs are received from spooler the queued
// bulkEvents can still be cleaned up and forwarded to the registrar
ticker := time.NewTicker(defaultGCTimeout)

for {
select {
case <-p.done:
return
case events := <-p.in:
pubEvents := make([]common.MapStr, len(events))
for i, event := range events {
pubEvents[i] = event.ToMapStr()
}

batch := &eventsBatch{
flag: 0,
events: events,
}
p.client.PublishEvents(pubEvents,
publisher.Signal(batch), publisher.Guaranteed)

p.active.append(batch)
case <-ticker.C:
}

p.collect()
}
}()
}

func (p *asyncLogPublisher) Stop() {
close(p.done)
p.wg.Wait()
}

// collect collects finished bulk-Events in order and forward processed batches
// to registrar. Reports to registrar are guaranteed to be in same order
// as bulk-Events have been received by the spooler
func (p *asyncLogPublisher) collect() bool {
for batch := p.active.head; batch != nil; batch = batch.next {
state := atomic.LoadInt32(&batch.flag)
if state == 0 && !p.failing {
break
}

// remove batch from active list
p.active.head = batch.next
if batch.next == nil {
p.active.tail = nil
}

// Batches get marked as failed, if publisher pipeline is shutting down
// In this case we do not want to send any more batches to the registrar
if state == batchFailed {
p.failing = true
}

if p.failing {
// if in failing state keep cleaning up queue
continue
}

// Tell the registrar that we've successfully sent these events
select {
case <-p.done:
return false
case p.out <- batch.events:
}
}
return true
}

func (b *eventsBatch) Completed() { atomic.StoreInt32(&b.flag, batchSuccess) }
func (b *eventsBatch) Failed() { atomic.StoreInt32(&b.flag, batchFailed) }

func (l *batchList) append(b *eventsBatch) {
if l.head == nil {
l.head = b
} else {
l.tail.next = b
}
b.next = nil
l.tail = b
}
79 changes: 79 additions & 0 deletions filebeat/beat/publish_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package beat

import (
"fmt"
"testing"
"time"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/publisher"
"github.com/stretchr/testify/assert"
)

func makeEvents(name string, n int) []*input.FileEvent {
var events []*input.FileEvent
for i := 0; i < n; i++ {
event := &input.FileEvent{
ReadTime: time.Now(),
InputType: "log",
DocumentType: "log",
Bytes: 100,
Offset: int64(i),
Source: &name,
}
events = append(events, event)
}
return events
}

func TestPublisherModes(t *testing.T) {
tests := []struct {
title string
async bool
order []int
}{
{"sync", false, []int{1, 2, 3, 4, 5, 6}},
{"async ordered signal", true, []int{1, 2, 3, 4, 5, 6}},
{"async out of order signal", true, []int{5, 2, 3, 1, 4, 6}},
}

for i, test := range tests {
t.Logf("run publisher test (%v): %v", i, test.title)

pubChan := make(chan []*input.FileEvent, len(test.order)+1)
regChan := make(chan []*input.FileEvent, len(test.order)+1)
client := publisher.ExtChanClient{make(chan publisher.PublishMessage)}

pub := newPublisher(test.async, pubChan, regChan, client)
pub.Start()

var events [][]*input.FileEvent
for i := range test.order {
tmp := makeEvents(fmt.Sprintf("msg: %v", i), 1)
pubChan <- tmp
events = append(events, tmp)
}

var msgs []publisher.PublishMessage
for _ = range test.order {
m := <-client.Channel
msgs = append(msgs, m)
}

for _, i := range test.order {
outputs.SignalCompleted(msgs[i-1].Context.Signal)
}

var regEvents [][]*input.FileEvent
for _ = range test.order {
regEvents = append(regEvents, <-regChan)
}
pub.Stop()

// validate order
for i := range events {
assert.Equal(t, events[i], regEvents[i])
}
}
}
1 change: 1 addition & 0 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Config struct {
type FilebeatConfig struct {
Prospectors []ProspectorConfig
SpoolSize uint64 `yaml:"spool_size"`
PublishAsync bool `yaml:"publish_async"`
IdleTimeout string `yaml:"idle_timeout"`
IdleTimeoutDuration time.Duration
RegistryFile string `yaml:"registry_file"`
Expand Down
7 changes: 7 additions & 0 deletions filebeat/docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,13 @@ filebeat:
-------------------------------------------------------------------------------------


===== publish_async

If enabled, the publisher pipeline in filebeat operates in async mode preparing
a new batch of lines while waiting for ACK. This option can improve load-balancing
throughput at the cost of increased memory usage. The default value is false.


===== idle_timeout

A duration string that specifies how often the spooler is flushed. After the
Expand Down
3 changes: 3 additions & 0 deletions filebeat/etc/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ filebeat:
# Event count spool threshold - forces network flush if exceeded
#spool_size: 2048

# Enable async publisher pipeline in filebeat (Experimental!)
#publish_async: false

# Defines how often the spooler is flushed. After idle_timeout the spooler is
# Flush even though spool_size is not reached.
#idle_timeout: 5s
Expand Down
Loading

0 comments on commit 7a2aeeb

Please sign in to comment.