diff --git a/publish/acker.go b/beater/acker.go similarity index 94% rename from publish/acker.go rename to beater/acker.go index 08a0c226733..8ddc5feb596 100644 --- a/publish/acker.go +++ b/beater/acker.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package publish +package beater import ( "context" @@ -29,6 +29,8 @@ import ( // events published. waitPublishedAcker provides an interruptible Wait method // that blocks until all events published at the time the client is closed are // acknowledged. +// +// TODO(axw) move this to libbeat/common/acker. type waitPublishedAcker struct { active int64 // atomic @@ -37,6 +39,7 @@ type waitPublishedAcker struct { done chan struct{} } +// newWaitPublishedAcker returns a new waitPublishedAcker. func newWaitPublishedAcker() *waitPublishedAcker { return &waitPublishedAcker{done: make(chan struct{})} } diff --git a/beater/beater.go b/beater/beater.go index 4952cfa266e..c4e721f910b 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -36,6 +36,7 @@ import ( "github.com/elastic/beats/v7/libbeat/instrumentation" "github.com/elastic/beats/v7/libbeat/logp" esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" + "github.com/elastic/beats/v7/libbeat/publisher/pipetool" "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/elasticsearch" @@ -73,6 +74,7 @@ func NewCreator(args CreatorParams) beat.Creator { stopped: false, logger: logger, wrapRunServer: args.WrapRunServer, + waitPublished: newWaitPublishedAcker(), } esOutputCfg := elasticsearchOutputConfig(b) @@ -103,6 +105,7 @@ type beater struct { logger *logp.Logger namespace string wrapRunServer func(RunServerFunc) RunServerFunc + waitPublished *waitPublishedAcker mutex sync.Mutex // guards stopServer and stopped stopServer func() @@ -173,7 +176,7 @@ func (bt *beater) Run(b *beat.Beat) error { runServer = bt.wrapRunServer(runServer) } - publisher, err := newPublisher(b, bt.config, bt.namespace, tracer) + publisher, err := bt.newPublisher(b, tracer) if err != nil { return err } @@ -187,6 +190,7 @@ func (bt *beater) Run(b *beat.Beat) error { defer cancelShutdownContext() } publisher.Stop(shutdownContext) + bt.waitPublished.Wait(shutdownContext) }() reporter := publisher.Send @@ -396,18 +400,19 @@ func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServ } } -func newPublisher(b *beat.Beat, cfg *config.Config, namespace string, tracer *apm.Tracer) (*publish.Publisher, error) { - transformConfig, err := newTransformConfig(b.Info, cfg) +func (bt *beater) newPublisher(b *beat.Beat, tracer *apm.Tracer) (*publish.Publisher, error) { + transformConfig, err := newTransformConfig(b.Info, bt.config) if err != nil { return nil, err } publisherConfig := &publish.PublisherConfig{ Info: b.Info, - Pipeline: cfg.Pipeline, - Namespace: namespace, + Pipeline: bt.config.Pipeline, + Namespace: bt.namespace, TransformConfig: transformConfig, } - return publish.NewPublisher(b.Publisher, tracer, publisherConfig) + pipeline := pipetool.WithACKer(b.Publisher, bt.waitPublished) + return publish.NewPublisher(pipeline, tracer, publisherConfig) } func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Config, error) { diff --git a/publish/pub.go b/publish/pub.go index 84733ad3031..a44f4034c46 100644 --- a/publish/pub.go +++ b/publish/pub.go @@ -34,18 +34,19 @@ import ( type Reporter func(context.Context, PendingReq) error -// Publisher forwards batches of events to libbeat. It uses GuaranteedSend -// to enable infinite retry of events being processed. +// Publisher forwards batches of events to libbeat. +// // If the publisher's input channel is full, an error is returned immediately. -// Number of concurrent requests waiting for processing do depend on the configured -// queue size. As the publisher is not waiting for the outputs ACK, the total -// number requests(events) active in the system can exceed the queue size. Only -// the number of concurrent HTTP requests trying to publish at the same time is limited. +// Publisher uses GuaranteedSend to enable infinite retry of events being processed. +// +// The number of concurrent requests waiting for processing depends on the configured +// queue size in libbeat. As the publisher is not waiting for the outputs ACK, the total +// number of events active in the system can exceed the queue size. Only the number of +// concurrent HTTP requests trying to publish at the same time is limited. type Publisher struct { stopped chan struct{} tracer *apm.Tracer client beat.Client - waitPublished *waitPublishedAcker transformConfig *transform.Config mu sync.RWMutex @@ -80,8 +81,9 @@ var ( ) // newPublisher creates a new publisher instance. -//MaxCPU new go-routines are started for forwarding events to libbeat. -//Stop must be called to close the beat.Client and free resources. +// +// GOMAXPROCS goroutines are started for forwarding events to libbeat. +// Stop must be called to close the beat.Client and free resources. func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer, cfg *PublisherConfig) (*Publisher, error) { if err := cfg.Validate(); err != nil { return nil, errors.Wrap(err, "invalid config") @@ -110,7 +112,6 @@ func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer, cfg *PublisherConf p := &Publisher{ tracer: tracer, stopped: make(chan struct{}), - waitPublished: newWaitPublishedAcker(), transformConfig: cfg.TransformConfig, // One request will be actively processed by the @@ -121,7 +122,6 @@ func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer, cfg *PublisherConf client, err := pipeline.ConnectWith(beat.ClientConfig{ PublishMode: beat.GuaranteedSend, Processing: processingCfg, - ACKHandler: p.waitPublished, }) if err != nil { return nil, err @@ -149,7 +149,9 @@ func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer, cfg *PublisherConf // indefinitely. // // The worker will drain the queue on shutdown, but no more requests will be -// published after Stop returns. +// published after Stop returns. Events may still exist in the libbeat pipeline +// after Stop returns; the caller is responsible for installing an ACKer as +// necessary. func (p *Publisher) Stop(ctx context.Context) error { // Prevent additional requests from being enqueued. p.mu.Lock() @@ -163,16 +165,12 @@ func (p *Publisher) Stop(ctx context.Context) error { // important here: // (1) wait for pendingRequests to be drained and published (p.stopped) // (2) close the beat.Client to prevent more events being published - // (3) wait for published events to be acknowledged select { case <-ctx.Done(): return ctx.Err() case <-p.stopped: } - if err := p.client.Close(); err != nil { - return err - } - return p.waitPublished.Wait(ctx) + return p.client.Close() } // Send tries to forward pendingReq to the publishers worker. If the queue is full,