Skip to content

Commit

Permalink
Publisher pipeline: pass logger and metrics registry (#8091)
Browse files Browse the repository at this point in the history
We should strive to not have dependencies on globals in beats. The
publisher pipeline rewrite made sure we don't work with globals
internally. Yet some globals have been introduced since, and even though
the library didn't use globals internally, initialization still did use
globals at some points.
This change removes globals for logging/metrics/telemetry, by requiring
the beat instance to pass down required instances.
  • Loading branch information
Steffen Siering authored Aug 29, 2018
1 parent e5f16e2 commit 8c15e6e
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Add DNS processor with support for performing reverse lookups on IP addresses. {issue}7770[7770]
- Implement CheckConfig in RunnerFactory to make autodiscover check configs {pull}7961[7961]
- Count HTTP 429 responses in the elasticsearch output {pull}8056[8056]
- Report configured queue type. {pull}8091[8091]

*Auditbeat*

Expand Down
9 changes: 8 additions & 1 deletion libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,14 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
}

debugf("Initializing output plugins")
pipeline, err := pipeline.Load(b.Info, reg, b.Config.Pipeline, b.Config.Output)
pipeline, err := pipeline.Load(b.Info,
pipeline.Monitors{
Metrics: reg,
Telemetry: monitoring.GetNamespace("state").GetRegistry(),
Logger: logp.L().Named("publisher"),
},
b.Config.Pipeline,
b.Config.Output)
if err != nil {
return nil, fmt.Errorf("error initializing publisher: %+v", err)
}
Expand Down
39 changes: 25 additions & 14 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ import (
)

type reporter struct {
done *stopper
done *stopper
logger *logp.Logger

checkRetry time.Duration

Expand All @@ -57,7 +58,9 @@ type reporter struct {
out outputs.Group
}

var debugf = logp.MakeDebug("monitoring")
const selector = "monitoring"

var debugf = logp.MakeDebug(selector)

var errNoMonitoring = errors.New("xpack monitoring not available")

Expand All @@ -72,6 +75,8 @@ func init() {
}

func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {
log := logp.L().Named(selector)

config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, err
Expand All @@ -89,7 +94,7 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {
return nil, err
}
if proxyURL != nil {
logp.Info("Using proxy URL: %s", proxyURL)
log.Infof("Using proxy URL: %s", proxyURL)
}
tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS)
if err != nil {
Expand Down Expand Up @@ -123,10 +128,11 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {
}

queueFactory := func(e queue.Eventer) (queue.Queue, error) {
return memqueue.NewBroker(memqueue.Settings{
Eventer: e,
Events: 20,
}), nil
return memqueue.NewBroker(log,
memqueue.Settings{
Eventer: e,
Events: 20,
}), nil
}

monitoring := monitoring.Default.GetRegistry("xpack.monitoring")
Expand All @@ -149,6 +155,7 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {
}

r := &reporter{
logger: log,
done: newStopper(),
beatMeta: makeMeta(beat),
tags: config.Tags,
Expand All @@ -171,18 +178,20 @@ func (r *reporter) initLoop(c config) {
debugf("Start monitoring endpoint init loop.")
defer debugf("Finish monitoring endpoint init loop.")

log := r.logger

logged := false

for {
// Select one configured endpoint by random and check if xpack is available
client := r.out.Clients[rand.Intn(len(r.out.Clients))].(outputs.NetworkClient)
err := client.Connect()
if err == nil {
closing(client)
closing(log, client)
break
} else {
if !logged {
logp.Info("Failed to connect to Elastic X-Pack Monitoring. Either Elasticsearch X-Pack monitoring is not enabled or Elasticsearch is not available. Will keep retrying.")
log.Info("Failed to connect to Elastic X-Pack Monitoring. Either Elasticsearch X-Pack monitoring is not enabled or Elasticsearch is not available. Will keep retrying.")
logged = true
}
debugf("Monitoring could not connect to elasticsearch, failed with %v", err)
Expand All @@ -195,7 +204,7 @@ func (r *reporter) initLoop(c config) {
}
}

logp.Info("Successfully connected to X-Pack Monitoring endpoint.")
log.Info("Successfully connected to X-Pack Monitoring endpoint.")

// Start collector and send loop if monitoring endpoint has been found.
go r.snapshotLoop("state", "state", c.StatePeriod)
Expand All @@ -207,8 +216,10 @@ func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration)
ticker := time.NewTicker(period)
defer ticker.Stop()

logp.Info("Start monitoring %s metrics snapshot loop with period %s.", namespace, period)
defer logp.Info("Stop monitoring %s metrics snapshot loop.", namespace)
log := r.logger

log.Infof("Start monitoring %s metrics snapshot loop with period %s.", namespace, period)
defer log.Infof("Stop monitoring %s metrics snapshot loop.", namespace)

for {
var ts time.Time
Expand Down Expand Up @@ -277,9 +288,9 @@ func makeClient(
return newPublishClient(esClient, params), nil
}

func closing(c io.Closer) {
func closing(log *logp.Logger, c io.Closer) {
if err := c.Close(); err != nil {
logp.Warn("Closed failed with: %v", err)
log.Warnf("Closed failed with: %v", err)
}
}

Expand Down
66 changes: 46 additions & 20 deletions libbeat/publisher/pipeline/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ var publishDisabled = false

const defaultQueueType = "mem"

// Monitors configures visibility for observing state and progress of the
// pipeline.
type Monitors struct {
Metrics *monitoring.Registry
Telemetry *monitoring.Registry
Logger *logp.Logger
}

func init() {
flag.BoolVar(&publishDisabled, "N", false, "Disable actual publishing for testing")
}
Expand All @@ -46,12 +54,17 @@ func init() {
// configured queue and outputs.
func Load(
beatInfo beat.Info,
reg *monitoring.Registry,
monitors Monitors,
config Config,
outcfg common.ConfigNamespace,
) (*Pipeline, error) {
log := monitors.Logger
if log == nil {
log = logp.L()
}

if publishDisabled {
logp.Info("Dry run mode. All output types except the file based one are disabled.")
log.Info("Dry run mode. All output types except the file based one are disabled.")
}

processors, err := processors.New(config.Processors)
Expand Down Expand Up @@ -80,68 +93,76 @@ func Load(
},
}

queueBuilder, err := createQueueBuilder(config.Queue)
queueBuilder, err := createQueueBuilder(config.Queue, monitors)
if err != nil {
return nil, err
}

out, err := loadOutput(beatInfo, reg, outcfg)
out, err := loadOutput(beatInfo, monitors, outcfg)
if err != nil {
return nil, err
}

p, err := New(beatInfo, reg, queueBuilder, out, settings)
p, err := New(beatInfo, monitors.Metrics, queueBuilder, out, settings)
if err != nil {
return nil, err
}

logp.Info("Beat name: %s", name)
log.Info("Beat name: %s", name)
return p, err
}

func loadOutput(
beatInfo beat.Info,
reg *monitoring.Registry,
monitors Monitors,
outcfg common.ConfigNamespace,
) (outputs.Group, error) {
log := monitors.Logger
if log == nil {
log = logp.L()
}

if publishDisabled {
return outputs.Group{}, nil
}

if !outcfg.IsSet() {
msg := "No outputs are defined. Please define one under the output section."
logp.Info(msg)
log.Info(msg)
return outputs.Fail(errors.New(msg))
}

// TODO: add support to unload/reassign outStats on output reloading

var (
outReg *monitoring.Registry
metrics *monitoring.Registry
outStats outputs.Observer
)
if reg != nil {
outReg = reg.NewRegistry("output")
outStats = outputs.NewStats(outReg)
if monitors.Metrics != nil {
metrics = monitors.Metrics.NewRegistry("output")
outStats = outputs.NewStats(metrics)
}

out, err := outputs.Load(beatInfo, outStats, outcfg.Name(), outcfg.Config())
if err != nil {
return outputs.Fail(err)
}

if outReg != nil {
monitoring.NewString(outReg, "type").Set(outcfg.Name())
if metrics != nil {
monitoring.NewString(metrics, "type").Set(outcfg.Name())
}
if monitors.Telemetry != nil {
telemetry := monitors.Telemetry.NewRegistry("output")
monitoring.NewString(telemetry, "name").Set(outcfg.Name())
}

stateRegistry := monitoring.GetNamespace("state").GetRegistry()
outputRegistry := stateRegistry.NewRegistry("output")
monitoring.NewString(outputRegistry, "name").Set(outcfg.Name())

return out, nil
}

func createQueueBuilder(config common.ConfigNamespace) (func(queue.Eventer) (queue.Queue, error), error) {
func createQueueBuilder(
config common.ConfigNamespace,
monitors Monitors,
) (func(queue.Eventer) (queue.Queue, error), error) {
queueType := defaultQueueType
if b := config.Name(); b != "" {
queueType = b
Expand All @@ -157,7 +178,12 @@ func createQueueBuilder(config common.ConfigNamespace) (func(queue.Eventer) (que
queueConfig = common.NewConfig()
}

if monitors.Telemetry != nil {
queueReg := monitors.Telemetry.NewRegistry("queue")
monitoring.NewString(queueReg, "name").Set(queueType)
}

return func(eventer queue.Eventer) (queue.Queue, error) {
return queueFactory(eventer, queueConfig)
return queueFactory(eventer, monitors.Logger, queueConfig)
}, nil
}
9 changes: 7 additions & 2 deletions libbeat/publisher/pipeline/stress/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,13 @@ func RunTests(
return fmt.Errorf("unpacking config failed: %v", err)
}

// reg := monitoring.NewRegistry()
pipeline, err := pipeline.Load(info, nil, config.Pipeline, config.Output)
pipeline, err := pipeline.Load(info, pipeline.Monitors{
Metrics: nil,
Telemetry: nil,
Logger: logp.L(),
},
config.Pipeline,
config.Output)
if err != nil {
return fmt.Errorf("loading pipeline failed: %+v", err)
}
Expand Down
15 changes: 12 additions & 3 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,17 @@ func init() {
queue.RegisterType("mem", create)
}

func create(eventer queue.Eventer, cfg *common.Config) (queue.Queue, error) {
func create(eventer queue.Eventer, logger *logp.Logger, cfg *common.Config) (queue.Queue, error) {
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, err
}

return NewBroker(Settings{
if logger == nil {
logger = logp.L()
}

return NewBroker(logger, Settings{
Eventer: eventer,
Events: config.Events,
FlushMinEvents: config.FlushMinEvents,
Expand All @@ -105,6 +109,7 @@ func create(eventer queue.Eventer, cfg *common.Config) (queue.Queue, error) {
// If waitOnClose is set to true, the broker will block on Close, until all internal
// workers handling incoming messages and ACKs have been shut down.
func NewBroker(
logger logger,
settings Settings,
) *Broker {
// define internal channel size for producer/client requests
Expand All @@ -128,9 +133,13 @@ func NewBroker(
minEvents = sz
}

if logger == nil {
logger = logp.NewLogger("memqueue")
}

b := &Broker{
done: make(chan struct{}),
logger: logp.NewLogger("memqueue"),
logger: logger,

// broker API channels
events: make(chan pushRequest, chanSize),
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestProducerCancelRemovesEvents(t *testing.T) {

func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.QueueFactory {
return func(_ *testing.T) queue.Queue {
return NewBroker(Settings{
return NewBroker(nil, Settings{
Events: sz,
FlushMinEvents: minEvents,
FlushTimeout: flushTimeout,
Expand Down
3 changes: 2 additions & 1 deletion libbeat/publisher/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
)

// Factory for creating a queue used by a pipeline instance.
type Factory func(Eventer, *common.Config) (Queue, error)
type Factory func(Eventer, *logp.Logger, *common.Config) (Queue, error)

// Eventer listens to special events to be send by queue implementations.
type Eventer interface {
Expand Down
10 changes: 8 additions & 2 deletions libbeat/publisher/queue/spool/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/feature"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/paths"
"github.com/elastic/beats/libbeat/publisher/queue"
"github.com/elastic/go-txfile"
Expand All @@ -38,7 +39,7 @@ func init() {
queue.RegisterType("spool", create)
}

func create(eventer queue.Eventer, cfg *common.Config) (queue.Queue, error) {
func create(eventer queue.Eventer, logp *logp.Logger, cfg *common.Config) (queue.Queue, error) {
cfgwarn.Beta("Spooling to disk is beta")

config := defaultConfig()
Expand All @@ -56,7 +57,12 @@ func create(eventer queue.Eventer, cfg *common.Config) (queue.Queue, error) {
flushEvents = uint(count)
}

return NewSpool(defaultLogger(), path, Settings{
var log logger = logp
if logp == nil {
log = defaultLogger()
}

return NewSpool(log, path, Settings{
Eventer: eventer,
Mode: config.File.Permissions,
WriteBuffer: uint(config.Write.BufferSize),
Expand Down

0 comments on commit 8c15e6e

Please sign in to comment.