Skip to content

Commit

Permalink
beater: move event ACKer from publisher to beater (elastic#4617)
Browse files Browse the repository at this point in the history
In the future we will start and stop multiple publishers.
To avoid blocking the publisher.Stop method unnecessarily
during a config reload, move event ACKing to beater.
  • Loading branch information
axw committed Feb 17, 2021
1 parent bd0a9c3 commit cb3cda8
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 24 deletions.
5 changes: 4 additions & 1 deletion publish/acker.go → beater/acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package publish
package beater

import (
"context"
Expand All @@ -29,6 +29,8 @@ import (
// events published. waitPublishedAcker provides an interruptible Wait method
// that blocks until all events published at the time the client is closed are
// acknowledged.
//
// TODO(axw) move this to libbeat/common/acker.
type waitPublishedAcker struct {
active int64 // atomic

Expand All @@ -37,6 +39,7 @@ type waitPublishedAcker struct {
done chan struct{}
}

// newWaitPublishedAcker returns a new waitPublishedAcker.
func newWaitPublishedAcker() *waitPublishedAcker {
return &waitPublishedAcker{done: make(chan struct{})}
}
Expand Down
17 changes: 11 additions & 6 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/elastic/beats/v7/libbeat/instrumentation"
"github.com/elastic/beats/v7/libbeat/logp"
esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"

"github.com/elastic/apm-server/beater/config"
"github.com/elastic/apm-server/elasticsearch"
Expand Down Expand Up @@ -73,6 +74,7 @@ func NewCreator(args CreatorParams) beat.Creator {
stopped: false,
logger: logger,
wrapRunServer: args.WrapRunServer,
waitPublished: newWaitPublishedAcker(),
}

esOutputCfg := elasticsearchOutputConfig(b)
Expand Down Expand Up @@ -103,6 +105,7 @@ type beater struct {
logger *logp.Logger
namespace string
wrapRunServer func(RunServerFunc) RunServerFunc
waitPublished *waitPublishedAcker

mutex sync.Mutex // guards stopServer and stopped
stopServer func()
Expand Down Expand Up @@ -173,7 +176,7 @@ func (bt *beater) Run(b *beat.Beat) error {
runServer = bt.wrapRunServer(runServer)
}

publisher, err := newPublisher(b, bt.config, bt.namespace, tracer)
publisher, err := bt.newPublisher(b, tracer)
if err != nil {
return err
}
Expand All @@ -187,6 +190,7 @@ func (bt *beater) Run(b *beat.Beat) error {
defer cancelShutdownContext()
}
publisher.Stop(shutdownContext)
bt.waitPublished.Wait(shutdownContext)
}()

reporter := publisher.Send
Expand Down Expand Up @@ -396,18 +400,19 @@ func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServ
}
}

func newPublisher(b *beat.Beat, cfg *config.Config, namespace string, tracer *apm.Tracer) (*publish.Publisher, error) {
transformConfig, err := newTransformConfig(b.Info, cfg)
func (bt *beater) newPublisher(b *beat.Beat, tracer *apm.Tracer) (*publish.Publisher, error) {
transformConfig, err := newTransformConfig(b.Info, bt.config)
if err != nil {
return nil, err
}
publisherConfig := &publish.PublisherConfig{
Info: b.Info,
Pipeline: cfg.Pipeline,
Namespace: namespace,
Pipeline: bt.config.Pipeline,
Namespace: bt.namespace,
TransformConfig: transformConfig,
}
return publish.NewPublisher(b.Publisher, tracer, publisherConfig)
pipeline := pipetool.WithACKer(b.Publisher, bt.waitPublished)
return publish.NewPublisher(pipeline, tracer, publisherConfig)
}

func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Config, error) {
Expand Down
32 changes: 15 additions & 17 deletions publish/pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,19 @@ import (

type Reporter func(context.Context, PendingReq) error

// Publisher forwards batches of events to libbeat. It uses GuaranteedSend
// to enable infinite retry of events being processed.
// Publisher forwards batches of events to libbeat.
//
// If the publisher's input channel is full, an error is returned immediately.
// Number of concurrent requests waiting for processing do depend on the configured
// queue size. As the publisher is not waiting for the outputs ACK, the total
// number requests(events) active in the system can exceed the queue size. Only
// the number of concurrent HTTP requests trying to publish at the same time is limited.
// Publisher uses GuaranteedSend to enable infinite retry of events being processed.
//
// The number of concurrent requests waiting for processing depends on the configured
// queue size in libbeat. As the publisher is not waiting for the outputs ACK, the total
// number of events active in the system can exceed the queue size. Only the number of
// concurrent HTTP requests trying to publish at the same time is limited.
type Publisher struct {
stopped chan struct{}
tracer *apm.Tracer
client beat.Client
waitPublished *waitPublishedAcker
transformConfig *transform.Config

mu sync.RWMutex
Expand Down Expand Up @@ -80,8 +81,9 @@ var (
)

// newPublisher creates a new publisher instance.
//MaxCPU new go-routines are started for forwarding events to libbeat.
//Stop must be called to close the beat.Client and free resources.
//
// GOMAXPROCS goroutines are started for forwarding events to libbeat.
// Stop must be called to close the beat.Client and free resources.
func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer, cfg *PublisherConfig) (*Publisher, error) {
if err := cfg.Validate(); err != nil {
return nil, errors.Wrap(err, "invalid config")
Expand Down Expand Up @@ -110,7 +112,6 @@ func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer, cfg *PublisherConf
p := &Publisher{
tracer: tracer,
stopped: make(chan struct{}),
waitPublished: newWaitPublishedAcker(),
transformConfig: cfg.TransformConfig,

// One request will be actively processed by the
Expand All @@ -121,7 +122,6 @@ func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer, cfg *PublisherConf
client, err := pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
Processing: processingCfg,
ACKHandler: p.waitPublished,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -149,7 +149,9 @@ func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer, cfg *PublisherConf
// indefinitely.
//
// The worker will drain the queue on shutdown, but no more requests will be
// published after Stop returns.
// published after Stop returns. Events may still exist in the libbeat pipeline
// after Stop returns; the caller is responsible for installing an ACKer as
// necessary.
func (p *Publisher) Stop(ctx context.Context) error {
// Prevent additional requests from being enqueued.
p.mu.Lock()
Expand All @@ -163,16 +165,12 @@ func (p *Publisher) Stop(ctx context.Context) error {
// important here:
// (1) wait for pendingRequests to be drained and published (p.stopped)
// (2) close the beat.Client to prevent more events being published
// (3) wait for published events to be acknowledged
select {
case <-ctx.Done():
return ctx.Err()
case <-p.stopped:
}
if err := p.client.Close(); err != nil {
return err
}
return p.waitPublished.Wait(ctx)
return p.client.Close()
}

// Send tries to forward pendingReq to the publishers worker. If the queue is full,
Expand Down

0 comments on commit cb3cda8

Please sign in to comment.