diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index f703f28f685..2a15ea86e16 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" @@ -105,7 +106,8 @@ func (c *outputController) Set(outGrp outputs.Group) { clients := outGrp.Clients worker := make([]outputWorker, len(clients)) for i, client := range clients { - worker[i] = makeClientWorker(c.observer, c.workQueue, client, c.monitors.Tracer) + logger := logp.NewLogger("publisher_pipeline_output") + worker[i] = makeClientWorker(c.observer, c.workQueue, client, logger, c.monitors.Tracer) } grp := &outputGroup{ workQueue: c.workQueue, diff --git a/libbeat/publisher/pipeline/logger.go b/libbeat/publisher/pipeline/logger.go new file mode 100644 index 00000000000..2bb0b4ad5e9 --- /dev/null +++ b/libbeat/publisher/pipeline/logger.go @@ -0,0 +1,29 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pipeline + +type logger interface { + Debug(vs ...interface{}) + Debugf(fmt string, vs ...interface{}) + + Info(vs ...interface{}) + Infof(fmt string, vs ...interface{}) + + Error(vs ...interface{}) + Errorf(fmt string, vs ...interface{}) +} diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index ffc5acfa6ad..febccdc7067 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -25,7 +25,6 @@ import ( "go.elastic.co/apm" - "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" ) @@ -49,12 +48,12 @@ type netClientWorker struct { batchSize int batchSizer func() int - logger *logp.Logger + logger logger tracer *apm.Tracer } -func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Client, tracer *apm.Tracer) outputWorker { +func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Client, logger logger, tracer *apm.Tracer) outputWorker { w := worker{ observer: observer, qu: qu, @@ -70,7 +69,7 @@ func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Clie c = &netClientWorker{ worker: w, client: nc, - logger: logp.NewLogger("publisher_pipeline_output"), + logger: logger, tracer: tracer, } } else { diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index ea0d0f96515..f150a909320 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -18,7 +18,9 @@ package pipeline import ( + "fmt" "math" + "strings" "sync" "testing" "testing/quick" @@ -30,7 +32,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/internal/testutil" - "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher" ) @@ -49,8 +50,10 @@ func TestMakeClientWorker(t *testing.T) { numBatches := 300 + (i % 100) // between 300 and 399 numEvents := atomic.MakeUint(0) + logger := makeBufLogger(t) + wqu := makeWorkQueue() - retryer := newRetryer(logp.NewLogger("test"), nilObserver, wqu, nil) + retryer := newRetryer(logger, nilObserver, wqu, nil) defer retryer.close() var published atomic.Uint @@ -61,7 +64,7 @@ func TestMakeClientWorker(t *testing.T) { client := ctor(publishFn) - worker := makeClientWorker(nilObserver, wqu, client, nil) + worker := makeClientWorker(nilObserver, wqu, client, logger, nil) defer worker.Close() for i := uint(0); i < numBatches; i++ { @@ -74,9 +77,14 @@ func TestMakeClientWorker(t *testing.T) { timeout := 20 * time.Second // Make sure that all events have eventually been published - return waitUntilTrue(timeout, func() bool { + success := waitUntilTrue(timeout, func() bool { return numEvents == published }) + if !success { + logger.Flush() + t.Logf("numBatches = %v, numEvents = %v, published = %v", numBatches, numEvents, published) + } + return success }, nil) if err != nil { @@ -102,8 +110,10 @@ func TestReplaceClientWorker(t *testing.T) { err := quick.Check(func(i uint) bool { numBatches := 1000 + (i % 100) // between 1000 and 1099 + logger := makeBufLogger(t) + wqu := makeWorkQueue() - retryer := newRetryer(logp.NewLogger("test"), nilObserver, wqu, nil) + retryer := newRetryer(logger, nilObserver, wqu, nil) defer retryer.close() var batches []publisher.Batch @@ -140,7 +150,7 @@ func TestReplaceClientWorker(t *testing.T) { } client := ctor(blockingPublishFn) - worker := makeClientWorker(nilObserver, wqu, client, nil) + worker := makeClientWorker(nilObserver, wqu, client, logger, nil) // Allow the worker to make *some* progress before we close it timeout := 10 * time.Second @@ -165,14 +175,20 @@ func TestReplaceClientWorker(t *testing.T) { } client = ctor(countingPublishFn) - makeClientWorker(nilObserver, wqu, client, nil) + makeClientWorker(nilObserver, wqu, client, logger, nil) wg.Wait() // Make sure that all events have eventually been published timeout = 20 * time.Second - return waitUntilTrue(timeout, func() bool { + success := waitUntilTrue(timeout, func() bool { return numEvents == int(publishedFirst.Load()+publishedLater.Load()) }) + if !success { + logger.Flush() + t.Logf("numBatches = %v, numEvents = %v, publishedFirst = %v, publishedLater = %v", + numBatches, numEvents, publishedFirst.Load(), publishedLater.Load()) + } + return success }, &quick.Config{MaxCount: 25}) if err != nil { @@ -188,8 +204,10 @@ func TestMakeClientTracer(t *testing.T) { numBatches := 10 numEvents := atomic.MakeUint(0) + logger := makeBufLogger(t) + wqu := makeWorkQueue() - retryer := newRetryer(logp.NewLogger("test"), nilObserver, wqu, nil) + retryer := newRetryer(logger, nilObserver, wqu, nil) defer retryer.close() var published atomic.Uint @@ -203,7 +221,7 @@ func TestMakeClientTracer(t *testing.T) { recorder := apmtest.NewRecordingTracer() defer recorder.Close() - worker := makeClientWorker(nilObserver, wqu, client, recorder.Tracer) + worker := makeClientWorker(nilObserver, wqu, client, logger, recorder.Tracer) defer worker.Close() for i := 0; i < numBatches; i++ { @@ -227,6 +245,56 @@ func TestMakeClientTracer(t *testing.T) { apmEvents := recorder.Payloads() transactions := apmEvents.Transactions if len(transactions) != numBatches { + logger.Flush() t.Errorf("expected %d traces, got %d", numBatches, len(transactions)) } } + +// bufLogger is a buffered logger. It does not immediately print out log lines; instead it +// buffers them. To print them out, one must explicitly call it's Flush() method. This is +// useful when you want to see the logs only when tests fail but not when they pass. +type bufLogger struct { + t *testing.T + lines []string + mu sync.RWMutex +} + +func (l *bufLogger) Debug(vs ...interface{}) { l.report("DEBUG", vs) } +func (l *bufLogger) Debugf(fmt string, vs ...interface{}) { l.reportf("DEBUG ", fmt, vs) } + +func (l *bufLogger) Info(vs ...interface{}) { l.report("INFO", vs) } +func (l *bufLogger) Infof(fmt string, vs ...interface{}) { l.reportf("INFO", fmt, vs) } + +func (l *bufLogger) Error(vs ...interface{}) { l.report("ERROR", vs) } +func (l *bufLogger) Errorf(fmt string, vs ...interface{}) { l.reportf("ERROR", fmt, vs) } + +func (l *bufLogger) report(level string, vs []interface{}) { + str := strings.TrimRight(strings.Repeat("%v ", len(vs)), " ") + l.reportf(level, str, vs) +} +func (l *bufLogger) reportf(level, str string, vs []interface{}) { + str = level + ": " + str + line := fmt.Sprintf(str, vs...) + + l.mu.Lock() + defer l.mu.Unlock() + l.lines = append(l.lines, line) +} + +func (l *bufLogger) Flush() { + l.mu.Lock() + defer l.mu.Unlock() + + for _, line := range l.lines { + l.t.Log(line) + } + + l.lines = make([]string, 0) +} + +func makeBufLogger(t *testing.T) *bufLogger { + return &bufLogger{ + t: t, + lines: make([]string, 0), + } +} diff --git a/libbeat/publisher/pipeline/retry.go b/libbeat/publisher/pipeline/retry.go index 0d724e80278..77f439f2fad 100644 --- a/libbeat/publisher/pipeline/retry.go +++ b/libbeat/publisher/pipeline/retry.go @@ -19,8 +19,6 @@ package pipeline import ( "sync" - - "github.com/elastic/beats/v7/libbeat/logp" ) // retryer is responsible for accepting and managing failed send attempts. It @@ -31,7 +29,7 @@ import ( // will the consumer be paused, until some batches have been processed by some // outputs. type retryer struct { - logger *logp.Logger + logger logger observer outputObserver done chan struct{} @@ -77,7 +75,7 @@ const ( ) func newRetryer( - log *logp.Logger, + log logger, observer outputObserver, out workQueue, c interruptor,