Skip to content

Commit

Permalink
Optional monitoring registry (elastic#5071)
Browse files Browse the repository at this point in the history
- update pipeline to optionally accept a monitoring registry
  -> if no registry is given, don't collect any metrics
- turn observer type into interface + sub-interfaces for the specific
  pipeline components metrics are collected on
- introduce a nilObserver used if no metrics shall be collected
  • Loading branch information
Steffen Siering authored and ruflin committed Sep 1, 2017
1 parent 518504c commit 84bfda1
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 36 deletions.
2 changes: 1 addition & 1 deletion libbeat/publisher/pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Batch struct {
}

type batchContext struct {
observer *observer
observer outputObserver
retryer *retryer
}

Expand Down
6 changes: 3 additions & 3 deletions libbeat/publisher/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// - reload
type outputController struct {
logger *logp.Logger
observer *observer
observer outputObserver

queue queue.Queue

Expand Down Expand Up @@ -40,7 +40,7 @@ type outputWorker interface {

func newOutputController(
log *logp.Logger,
observer *observer,
observer outputObserver,
b queue.Queue,
) *outputController {
c := &outputController{
Expand All @@ -52,7 +52,7 @@ func newOutputController(
ctx := &batchContext{}
c.consumer = newEventConsumer(log, b, ctx)
c.retryer = newRetryer(log, observer, nil, c.consumer)
ctx.observer = c.observer
ctx.observer = observer
ctx.retryer = c.retryer

c.consumer.sigContinue()
Expand Down
98 changes: 77 additions & 21 deletions libbeat/publisher/pipeline/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,48 @@ package pipeline

import "github.com/elastic/beats/libbeat/monitoring"

// observer is used by many component in the publisher pipeline, to report
type observer interface {
pipelineObserver
clientObserver
queueObserver
outputObserver

cleanup()
}

type pipelineObserver interface {
clientConnected()
clientClosing()
clientClosed()
}

type clientObserver interface {
newEvent()
filteredEvent()
publishedEvent()
failedPublishEvent()
}

type queueObserver interface {
queueACKed(n int)
}

type outputObserver interface {
updateOutputGroup()
eventsFailed(int)
eventsDropped(int)
eventsRetry(int)
outBatchSend(int)
outBatchACKed(int)
}

// metricsObserver is used by many component in the publisher pipeline, to report
// internal events. The oberserver can call registered global event handlers or
// updated shared counters/metrics for reporting.
// All events required for reporting events/metrics on the pipeline-global level
// are defined by observer. The components are only allowed to serve localized
// event-handlers only (e.g. the client centric events callbacks)
type observer struct {
type metricsObserver struct {
metrics *monitoring.Registry

// clients metrics
Expand All @@ -23,14 +58,13 @@ type observer struct {
ackedQueue *monitoring.Uint
}

func (o *observer) init(metrics *monitoring.Registry) {
o.metrics = metrics
func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver {
reg := metrics.GetRegistry("pipeline")
if reg == nil {
reg = metrics.NewRegistry("pipeline")
}

*o = observer{
return &metricsObserver{
metrics: metrics,
clients: monitoring.NewUint(reg, "clients"),

Expand All @@ -47,46 +81,48 @@ func (o *observer) init(metrics *monitoring.Registry) {
}
}

func (o *observer) cleanup() {
o.metrics.Remove("pipeline") // drop all metrics from registry
func (o *metricsObserver) cleanup() {
if o.metrics != nil {
o.metrics.Remove("pipeline") // drop all metrics from registry
}
}

//
// client connects/disconnects
//

// (pipeline) pipeline did finish creating a new client instance
func (o *observer) clientConnected() { o.clients.Inc() }
func (o *metricsObserver) clientConnected() { o.clients.Inc() }

// (client) close being called on client
func (o *observer) clientClosing() {}
func (o *metricsObserver) clientClosing() {}

// (client) client finished processing close
func (o *observer) clientClosed() { o.clients.Dec() }
func (o *metricsObserver) clientClosed() { o.clients.Dec() }

//
// client publish events
//

// (client) client is trying to publish a new event
func (o *observer) newEvent() {
func (o *metricsObserver) newEvent() {
o.events.Inc()
o.activeEvents.Inc()
}

// (client) event is filtered out (on purpose or failed)
func (o *observer) filteredEvent() {
func (o *metricsObserver) filteredEvent() {
o.filtered.Inc()
o.activeEvents.Dec()
}

// (client) managed to push an event into the publisher pipeline
func (o *observer) publishedEvent() {
func (o *metricsObserver) publishedEvent() {
o.published.Inc()
}

// (client) client closing down or DropIfFull is set
func (o *observer) failedPublishEvent() {
func (o *metricsObserver) failedPublishEvent() {
o.failed.Inc()
o.activeEvents.Dec()
}
Expand All @@ -96,7 +132,7 @@ func (o *observer) failedPublishEvent() {
//

// (queue) number of events ACKed by the queue/broker in use
func (o *observer) queueACKed(n int) {
func (o *metricsObserver) queueACKed(n int) {
o.ackedQueue.Add(uint64(n))
o.activeEvents.Sub(uint64(n))
}
Expand All @@ -106,23 +142,43 @@ func (o *observer) queueACKed(n int) {
//

// (controller) new output group is about to be loaded
func (o *observer) updateOutputGroup() {}
func (o *metricsObserver) updateOutputGroup() {}

// (retryer) new failed batch has been received
func (o *observer) eventsFailed(int) {}
func (o *metricsObserver) eventsFailed(int) {}

// (retryer) number of events dropped by retryer
func (o *observer) eventsDropped(n int) {
func (o *metricsObserver) eventsDropped(n int) {
o.dropped.Add(uint64(n))
}

// (retryer) number of events pushed to the output worker queue
func (o *observer) eventsRetry(n int) {
func (o *metricsObserver) eventsRetry(n int) {
o.retry.Add(uint64(n))
}

// (output) number of events to be forwarded to the output client
func (o *observer) outBatchSend(int) {}
func (o *metricsObserver) outBatchSend(int) {}

// (output) number of events acked by the output batch
func (o *observer) outBatchACKed(int) {}
func (o *metricsObserver) outBatchACKed(int) {}

type emptyObserver struct{}

var nilObserver observer = (*emptyObserver)(nil)

func (*emptyObserver) cleanup() {}
func (*emptyObserver) clientConnected() {}
func (*emptyObserver) clientClosing() {}
func (*emptyObserver) clientClosed() {}
func (*emptyObserver) newEvent() {}
func (*emptyObserver) filteredEvent() {}
func (*emptyObserver) publishedEvent() {}
func (*emptyObserver) failedPublishEvent() {}
func (*emptyObserver) queueACKed(n int) {}
func (*emptyObserver) updateOutputGroup() {}
func (*emptyObserver) eventsFailed(int) {}
func (*emptyObserver) eventsDropped(int) {}
func (*emptyObserver) eventsRetry(int) {}
func (*emptyObserver) outBatchSend(int) {}
func (*emptyObserver) outBatchACKed(int) {}
8 changes: 4 additions & 4 deletions libbeat/publisher/pipeline/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (

// clientWorker manages output client of type outputs.Client, not supporting reconnect.
type clientWorker struct {
observer *observer
observer outputObserver
qu workQueue
client outputs.Client
closed atomic.Bool
}

// netClientWorker manages reconnectable output clients of type outputs.NetworkClient.
type netClientWorker struct {
observer *observer
observer outputObserver
qu workQueue
client outputs.NetworkClient
closed atomic.Bool
Expand All @@ -25,13 +25,13 @@ type netClientWorker struct {
batchSizer func() int
}

func makeClientWorker(observer *observer, qu workQueue, client outputs.Client) outputWorker {
func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Client) outputWorker {
if nc, ok := client.(outputs.NetworkClient); ok {
c := &netClientWorker{observer: observer, qu: qu, client: nc}
go c.run()
return c
}
c := &clientWorker{qu: qu, client: client}
c := &clientWorker{observer: observer, qu: qu, client: client}
go c.run()
return c
}
Expand Down
12 changes: 7 additions & 5 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type pipelineEventer struct {
mutex sync.Mutex
modifyable bool

observer *observer
observer queueObserver
waitClose *waitCloser
cb *pipelineEventCB
}
Expand Down Expand Up @@ -157,6 +157,7 @@ func New(
log := defaultLogger
p := &Pipeline{
logger: log,
observer: nilObserver,
waitCloseMode: settings.WaitCloseMode,
waitCloseTimeout: settings.WaitClose,
processors: pipelineProcessors{
Expand All @@ -169,7 +170,10 @@ func New(
p.ackBuilder = &pipelineEmptyACK{p}
p.ackActive = atomic.MakeBool(true)

p.eventer.observer = &p.observer
if metrics != nil {
p.observer = newMetricsObserver(metrics)
}
p.eventer.observer = p.observer
p.eventer.modifyable = true

if settings.WaitCloseMode == WaitOnPipelineClose && settings.WaitClose > 0 {
Expand All @@ -185,9 +189,7 @@ func New(
}
p.eventSema = newSema(p.queue.BufferConfig().Events)

p.observer.init(metrics)

p.output = newOutputController(log, &p.observer, p.queue)
p.output = newOutputController(log, p.observer, p.queue)
p.output.Set(out)

return p, nil
Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/pipeline/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
// outputs.
type retryer struct {
logger *logp.Logger
observer *observer
observer outputObserver

done chan struct{}

Expand Down Expand Up @@ -54,7 +54,7 @@ const (

func newRetryer(
log *logp.Logger,
observer *observer,
observer outputObserver,
out workQueue,
c *eventConsumer,
) *retryer {
Expand Down

0 comments on commit 84bfda1

Please sign in to comment.