Skip to content

Commit

Permalink
Cleanup Filebeat publishers
Browse files Browse the repository at this point in the history
* Separate async and sync publisher into own files
* Add Publish interface method which can be used to manually trigger publish (see elastic#2456)
* Add getDataEvents function
* Rename publish to publisher package
  • Loading branch information
ruflin committed Sep 5, 2016
1 parent 95df88a commit 7dac29e
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 118 deletions.
4 changes: 2 additions & 2 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/crawler"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/publish"
"github.com/elastic/beats/filebeat/publisher"
"github.com/elastic/beats/filebeat/registrar"
"github.com/elastic/beats/filebeat/spooler"
)
Expand Down Expand Up @@ -54,7 +54,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
publisherChan := make(chan []*input.Event, 1)

// Publishes event to output
publisher := publish.New(config.PublishAsync,
publisher := publisher.New(config.PublishAsync,
publisherChan, registrar.Channel, b.Publisher)

// Init and Start spooler: Harvesters dump events into the spooler.
Expand Down
142 changes: 27 additions & 115 deletions filebeat/publish/publish.go → filebeat/publisher/async.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,16 @@
package publish
package publisher

import (
"expvar"
"errors"
"sync"
"sync/atomic"
"time"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
)

type LogPublisher interface {
Start()
Stop()
}

type syncLogPublisher struct {
pub publisher.Publisher
client publisher.Client
in, out chan []*input.Event

done chan struct{}
wg sync.WaitGroup
}

type asyncLogPublisher struct {
pub publisher.Publisher
client publisher.Client
Expand Down Expand Up @@ -64,84 +49,6 @@ const (
batchCanceled
)

var (
eventsSent = expvar.NewInt("publish.events")
)

func New(
async bool,
in, out chan []*input.Event,
pub publisher.Publisher,
) LogPublisher {
if async {
return newAsyncLogPublisher(in, out, pub)
}
return newSyncLogPublisher(in, out, pub)
}

func newSyncLogPublisher(
in, out chan []*input.Event,
pub publisher.Publisher,
) *syncLogPublisher {
return &syncLogPublisher{
in: in,
out: out,
pub: pub,
done: make(chan struct{}),
}
}

func (p *syncLogPublisher) Start() {
p.client = p.pub.Connect()

p.wg.Add(1)
go func() {
defer p.wg.Done()

logp.Info("Start sending events to output")

for {
var events []*input.Event
select {
case <-p.done:
return
case events = <-p.in:
}

pubEvents := make([]common.MapStr, 0, len(events))
for _, event := range events {
// Only send event with bytes read. 0 Bytes means state update only
if event.HasData() {
pubEvents = append(pubEvents, event.ToMapStr())
}
}

ok := p.client.PublishEvents(pubEvents, publisher.Sync, publisher.Guaranteed)
if !ok {
// PublishEvents will only returns false, if p.client has been closed.
logp.Debug("publish", "Shutting down publisher")
return
}

logp.Debug("publish", "Events sent: %d", len(events))
eventsSent.Add(int64(len(events)))

// Tell the registrar that we've successfully sent these events
select {
case <-p.done:
return
case p.out <- events:
}
}
}()
}

func (p *syncLogPublisher) Stop() {
p.client.Close()
close(p.done)
p.wg.Wait()
}

func newAsyncLogPublisher(
in, out chan []*input.Event,
pub publisher.Publisher,
Expand All @@ -168,35 +75,40 @@ func (p *asyncLogPublisher) Start() {
ticker := time.NewTicker(defaultGCTimeout)

for {
err := p.Publish()
if err != nil {
return
}

select {
case <-p.done:
return
case events := <-p.in:

pubEvents := make([]common.MapStr, 0, len(events))
for _, event := range events {
if event.HasData() {
pubEvents = append(pubEvents, event.ToMapStr())
}
}

batch := &eventsBatch{
flag: 0,
events: events,
}
p.client.PublishEvents(pubEvents,
publisher.Signal(batch), publisher.Guaranteed)

p.active.append(batch)

case <-ticker.C:
}
p.collect()

p.collect()
}
}
}()
}

func (p *asyncLogPublisher) Publish() error {
select {
case <-p.done:
return errors.New("async publisher stopped")
case events := <-p.in:

batch := &eventsBatch{
flag: 0,
events: events,
}
p.client.PublishEvents(getDataEvents(events), publisher.Signal(batch), publisher.Guaranteed)

p.active.append(batch)
p.collect()
}
return nil
}

func (p *asyncLogPublisher) Stop() {
p.client.Close()
close(p.done)
Expand Down
41 changes: 41 additions & 0 deletions filebeat/publisher/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package publisher

import (
"expvar"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/publisher"
)

var (
eventsSent = expvar.NewInt("publish.events")
)

type LogPublisher interface {
Start()
Stop()
Publish() error
}

func New(
async bool,
in, out chan []*input.Event,
pub publisher.Publisher,
) LogPublisher {
if async {
return newAsyncLogPublisher(in, out, pub)
}
return newSyncLogPublisher(in, out, pub)
}

// getDataEvents returns all events which contain data (not only state updates)
func getDataEvents(events []*input.Event) []common.MapStr {
dataEvents := make([]common.MapStr, 0, len(events))
for _, event := range events {
if event.HasData() {
dataEvents = append(dataEvents, event.ToMapStr())
}
}
return dataEvents
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// +build !integration

package publish
package publisher

import (
"fmt"
Expand Down
83 changes: 83 additions & 0 deletions filebeat/publisher/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package publisher

import (
"errors"
"sync"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
)

type syncLogPublisher struct {
pub publisher.Publisher
client publisher.Client
in, out chan []*input.Event

done chan struct{}
wg sync.WaitGroup
}

func newSyncLogPublisher(
in, out chan []*input.Event,
pub publisher.Publisher,
) *syncLogPublisher {
return &syncLogPublisher{
in: in,
out: out,
pub: pub,
done: make(chan struct{}),
}
}

func (p *syncLogPublisher) Start() {
p.client = p.pub.Connect()

p.wg.Add(1)
go func() {
defer p.wg.Done()

logp.Info("Start sending events to output")

for {
err := p.Publish()
if err != nil {
logp.Debug("publisher", "Shutting down sync publisher")
return
}
}
}()
}

func (p *syncLogPublisher) Publish() error {
var events []*input.Event
select {
case <-p.done:
return errors.New("publishing was stopped")
case events = <-p.in:
}

ok := p.client.PublishEvents(getDataEvents(events), publisher.Sync, publisher.Guaranteed)
if !ok {
// PublishEvents will only returns false, if p.client has been closed.
return errors.New("publisher didn't published events")
}

logp.Debug("publish", "Events sent: %d", len(events))
eventsSent.Add(int64(len(events)))

// Tell the registrar that we've successfully sent these events
select {
case <-p.done:
return errors.New("publishing was stopped")
case p.out <- events:
}

return nil
}

func (p *syncLogPublisher) Stop() {
p.client.Close()
close(p.done)
p.wg.Wait()
}

0 comments on commit 7dac29e

Please sign in to comment.