Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update metricbeat to use new publisher API #4699

Merged
merged 7 commits into from
Jul 19, 2017
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 15 additions & 44 deletions libbeat/publisher/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,43 @@ package testing

// ChanClient implements Client interface, forwarding published events to some
import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
"github.com/elastic/beats/libbeat/publisher/beat"
)

type TestPublisher struct {
client publisher.Client
client beat.Client
}

// given channel only.
type ChanClient struct {
done chan struct{}
Channel chan PublishMessage

recvBuf []common.MapStr
}

type PublishMessage struct {
Context publisher.Context
Events []common.MapStr
Channel chan beat.Event
}

func PublisherWithClient(client publisher.Client) publisher.Publisher {
func PublisherWithClient(client beat.Client) publisher.Publisher {
return &TestPublisher{client}
}

func (pub *TestPublisher) Connect() publisher.Client {
return pub.client
panic("Not supported")
}

func (pub *TestPublisher) ConnectX(_ beat.ClientConfig) (beat.Client, error) {
panic("Not supported")
return pub.client, nil
}

func (pub *TestPublisher) SetACKHandler(_ beat.PipelineACKHandler) error {
panic("Not supported")
}

func NewChanClient(bufSize int) *ChanClient {
return NewChanClientWith(make(chan PublishMessage, bufSize))
return NewChanClientWith(make(chan beat.Event, bufSize))
}

func NewChanClientWith(ch chan PublishMessage) *ChanClient {
func NewChanClientWith(ch chan beat.Event) *ChanClient {
if ch == nil {
ch = make(chan PublishMessage, 1)
ch = make(chan beat.Event, 1)
}
c := &ChanClient{
done: make(chan struct{}),
Expand All @@ -62,40 +54,19 @@ func (c *ChanClient) Close() error {

// PublishEvent will publish the event on the channel. Options will be ignored.
// Always returns true.
func (c *ChanClient) PublishEvent(event common.MapStr, opts ...publisher.ClientOption) bool {
return c.PublishEvents([]common.MapStr{event}, opts...)
}

// PublishEvents publishes all event on the configured channel. Options will be ignored.
// Always returns true.
func (c *ChanClient) PublishEvents(events []common.MapStr, opts ...publisher.ClientOption) bool {
_, ctx := publisher.MakeContext(opts)
msg := PublishMessage{ctx, events}
func (c *ChanClient) Publish(event beat.Event) {
select {
case <-c.done:
return false
case c.Channel <- msg:
return true
case c.Channel <- event:
}
}

func (c *ChanClient) ReceiveEvent() common.MapStr {
if len(c.recvBuf) > 0 {
evt := c.recvBuf[0]
c.recvBuf = c.recvBuf[1:]
return evt
func (c *ChanClient) PublishAll(event []beat.Event) {
for _, e := range event {
c.Publish(e)
}

msg := <-c.Channel
c.recvBuf = msg.Events
return c.ReceiveEvent()
}

func (c *ChanClient) ReceiveEvents() []common.MapStr {
if len(c.recvBuf) > 0 {
return c.recvBuf
}

msg := <-c.Channel
return msg.Events
func (c *ChanClient) ReceiveEvent() beat.Event {
return <-c.Channel
}
16 changes: 10 additions & 6 deletions libbeat/publisher/testing/testing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@ import (
"testing"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/publisher/beat"
"github.com/stretchr/testify/assert"
)

var cnt = 0

func testEvent() common.MapStr {
event := common.MapStr{}
event["message"] = "test"
event["idx"] = cnt
func testEvent() beat.Event {
event := beat.Event{
Fields: common.MapStr{
"message": "test",
"idx": cnt,
},
}
cnt++
return event
}
Expand All @@ -21,7 +25,7 @@ func testEvent() common.MapStr {
func TestChanClientPublishEvent(t *testing.T) {
cc := NewChanClient(1)
e1 := testEvent()
cc.PublishEvent(e1)
cc.Publish(e1)
assert.Equal(t, e1, cc.ReceiveEvent())
}

Expand All @@ -30,7 +34,7 @@ func TestChanClientPublishEvents(t *testing.T) {
cc := NewChanClient(1)

e1, e2 := testEvent(), testEvent()
cc.PublishEvents([]common.MapStr{e1, e2})
go cc.PublishAll([]beat.Event{e1, e2})
assert.Equal(t, e1, cc.ReceiveEvent())
assert.Equal(t, e2, cc.ReceiveEvent())
}
64 changes: 52 additions & 12 deletions metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/module"
"github.com/joeshaw/multierror"

"github.com/elastic/beats/libbeat/cfgfile"
"github.com/pkg/errors"
Expand All @@ -19,12 +19,16 @@ import (

// Metricbeat implements the Beater interface for metricbeat.
type Metricbeat struct {
done chan struct{} // Channel used to initiate shutdown.
modules []*module.Wrapper // Active list of modules.
client publisher.Client // Publisher client.
done chan struct{} // Channel used to initiate shutdown.
modules []staticModule // Active list of modules.
config Config
}

type staticModule struct {
connector *module.Connector
module *module.Wrapper
}

// New creates and returns a new Metricbeat instance.
func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// List all registered modules and metricsets.
Expand All @@ -35,14 +39,45 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
return nil, errors.Wrap(err, "error reading configuration file")
}

modules, err := module.NewWrappers(config.MaxStartDelay, config.Modules, mb.Registry)
if err != nil {
// Empty config is fine if dynamic config is enabled
if !config.ConfigModules.Enabled() {
return nil, err
} else if err != mb.ErrEmptyConfig && err != mb.ErrAllModulesDisabled {
return nil, err
dynamicCfgEnabled := config.ConfigModules.Enabled()
if !dynamicCfgEnabled && len(config.Modules) == 0 {
return nil, mb.ErrEmptyConfig
}

var errs multierror.Errors
var modules []staticModule
for _, moduleCfg := range config.Modules {
if !moduleCfg.Enabled() {
continue
}

failed := false
connector, err := module.NewConnector(b.Publisher, moduleCfg)
if err != nil {
errs = append(errs, err)
failed = true
}

module, err := module.NewWrapper(config.MaxStartDelay, moduleCfg, mb.Registry)
if err != nil {
errs = append(errs, err)
failed = true
}

if failed {
continue
}
modules = append(modules, staticModule{
connector: connector,
module: module,
})
}

if err := errs.Err(); err != nil {
return nil, err
}
if len(modules) == 0 && !dynamicCfgEnabled {
return nil, mb.ErrAllModulesDisabled
}

mb := &Metricbeat{
Expand All @@ -62,7 +97,12 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
var wg sync.WaitGroup

for _, m := range bt.modules {
r := module.NewRunner(b.Publisher.Connect, m)
client, err := m.connector.Connect()
if err != nil {
return err
}

r := module.NewRunner(client, m.module)
r.Start()
wg.Add(1)
go func() {
Expand Down
Loading