From d7ffd0247ad211379ff9259f5f7e5cbe57df0943 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 2 Apr 2020 21:17:13 -0700 Subject: [PATCH 01/36] Adding unit tests for publisher output --- libbeat/publisher/pipeline/output_test.go | 143 ++++++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 libbeat/publisher/pipeline/output_test.go diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go new file mode 100644 index 00000000000..b576f7b1a8d --- /dev/null +++ b/libbeat/publisher/pipeline/output_test.go @@ -0,0 +1,143 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pipeline + +import ( + "math/rand" + "sync" + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/common/atomic" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/publisher/queue" + + "github.com/elastic/beats/v7/libbeat/logp" + + "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/publisher" +) + +func TestPublish(t *testing.T) { + tests := map[string]struct { + client outputs.Client + }{ + "client": { + &mockClient{}, + }, + "network_client": { + &mockNetworkClient{}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + + wqu := makeWorkQueue() + makeClientWorker(nilObserver, wqu, test.client) + + numEvents := atomic.MakeInt(0) + var wg sync.WaitGroup + for batchIdx := 0; batchIdx <= randIntBetween(25, 200); batchIdx++ { + wg.Add(1) + go func() { + defer wg.Done() + batch := randomBatch(50, 150, wqu) + + numEvents.Add(len(batch.Events())) + + wqu <- batch + }() + } + wg.Wait() + + // Give some time for events to be published + time.Sleep(time.Duration(numEvents.Load()*2) * time.Microsecond) + + c := test.client.(interface{ Published() int }) + assert.Equal(t, numEvents.Load(), c.Published()) + }) + } +} + +type mockClient struct{ published int } + +func (c *mockClient) Published() int { return c.published } + +func (c *mockClient) String() string { return "mock_client" } +func (c *mockClient) Close() error { return nil } +func (c *mockClient) Publish(batch publisher.Batch) error { + c.published += len(batch.Events()) + return nil +} + +type mockNetworkClient struct{ published int } + +func (c *mockNetworkClient) Published() int { return c.published } + +func (c *mockNetworkClient) String() string { return "mock_network_client" } +func (c *mockNetworkClient) Close() error { return nil } +func (c *mockNetworkClient) Publish(batch publisher.Batch) error { + c.published += len(batch.Events()) + return nil +} +func (c *mockNetworkClient) Connect() error { return nil } + +type mockQueue struct{} + +func (q mockQueue) Close() error { return nil } +func (q mockQueue) BufferConfig() queue.BufferConfig { return queue.BufferConfig{} } +func (q mockQueue) Producer(cfg queue.ProducerConfig) queue.Producer { return mockProducer{} } +func (q mockQueue) Consumer() queue.Consumer { return mockConsumer{} } + +type mockProducer struct{} + +func (p mockProducer) Publish(event publisher.Event) bool { return true } +func (p mockProducer) TryPublish(event publisher.Event) bool { return true } +func (p mockProducer) Cancel() int { return 0 } + +type mockConsumer struct{} + +func (c mockConsumer) Get(eventCount int) (queue.Batch, error) { return &Batch{}, nil } +func (c mockConsumer) Close() error { return nil } + +func randomBatch(min, max int, wqu workQueue) *Batch { + numEvents := randIntBetween(min, max) + events := make([]publisher.Event, numEvents) + + consumer := newEventConsumer(logp.L(), mockQueue{}, &batchContext{}) + retryer := newRetryer(logp.L(), nilObserver, wqu, consumer) + + batch := Batch{ + events: events, + ctx: &batchContext{ + observer: nilObserver, + retryer: retryer, + }, + } + + return &batch +} + +// randIntBetween returns a random integer in [min, max) +func randIntBetween(min, max int) int { + return rand.Intn(max-min) + min +} From 7e49fcee17a516a896722cda74280f556822de86 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 2 Apr 2020 21:21:31 -0700 Subject: [PATCH 02/36] Adding another unit test (TODO) --- libbeat/publisher/pipeline/output_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index b576f7b1a8d..27abbb2157b 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -78,6 +78,12 @@ func TestPublish(t *testing.T) { } } +func TestPublishWithClose(t *testing.T) { + // Test that multiple batches are all published, even if + // the worker is closed midway + // TODO +} + type mockClient struct{ published int } func (c *mockClient) Published() int { return c.published } From 5627ff993b6bcb5054ac9341655bfc09a4d627d0 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 2 Apr 2020 22:07:59 -0700 Subject: [PATCH 03/36] Adding unit test for closing worker midway --- libbeat/publisher/pipeline/output_test.go | 44 ++++++++++++++++++++--- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 27abbb2157b..f155173cce5 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -23,6 +23,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/stretchr/testify/assert" @@ -70,8 +72,9 @@ func TestPublish(t *testing.T) { wg.Wait() // Give some time for events to be published - time.Sleep(time.Duration(numEvents.Load()*2) * time.Microsecond) + time.Sleep(time.Duration(numEvents.Load()*3) * time.Microsecond) + // Make sure that all events have eventually been published c := test.client.(interface{ Published() int }) assert.Equal(t, numEvents.Load(), c.Published()) }) @@ -79,9 +82,42 @@ func TestPublish(t *testing.T) { } func TestPublishWithClose(t *testing.T) { - // Test that multiple batches are all published, even if - // the worker is closed midway - // TODO + rand.Seed(time.Now().UnixNano()) + + wqu := makeWorkQueue() + client := &mockNetworkClient{} + worker := makeClientWorker(nilObserver, wqu, client) + + numEvents := atomic.MakeInt(0) + var wg sync.WaitGroup + for batchIdx := 0; batchIdx <= randIntBetween(25, 200); batchIdx++ { + wg.Add(1) + go func() { + defer wg.Done() + batch := randomBatch(50, 150, wqu) + + numEvents.Add(len(batch.Events())) + + wqu <- batch + }() + } + + // Close worker before all batches have had time to be published + err := worker.Close() + require.NoError(t, err) + + remaining := numEvents.Load() - client.Published() + assert.Greater(t, remaining, 0) + + // Start new worker to drain work queue + makeClientWorker(nilObserver, wqu, client) + wg.Wait() + + // Give some time for events to be published + time.Sleep(time.Duration(remaining*3) * time.Microsecond) + + // Make sure that all events have eventually been published + assert.Equal(t, numEvents.Load(), client.Published()) } type mockClient struct{ published int } From babb3986012a98d37ff46031d587487b45718217 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 2 Apr 2020 22:11:16 -0700 Subject: [PATCH 04/36] Reorganizing imports --- libbeat/publisher/pipeline/output_test.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index f155173cce5..166c1070800 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -23,18 +23,14 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/common/atomic" - - "github.com/stretchr/testify/assert" - - "github.com/elastic/beats/v7/libbeat/publisher/queue" - "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/beats/v7/libbeat/publisher/queue" ) func TestPublish(t *testing.T) { From b7163ddb4a89ccb06bc27b9ad9e930d74f28d7a0 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 3 Apr 2020 12:13:31 -0700 Subject: [PATCH 05/36] Output PRNG seed + provide flag to specify seed --- libbeat/publisher/pipeline/output_test.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 166c1070800..0d4bd08e104 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -18,6 +18,7 @@ package pipeline import ( + "flag" "math/rand" "sync" "testing" @@ -33,6 +34,10 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher/queue" ) +var ( + SeedFlag = flag.Int64("seed", 0, "Randomization seed") +) + func TestPublish(t *testing.T) { tests := map[string]struct { client outputs.Client @@ -47,7 +52,7 @@ func TestPublish(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { - rand.Seed(time.Now().UnixNano()) + seedPRNG(t) wqu := makeWorkQueue() makeClientWorker(nilObserver, wqu, test.client) @@ -78,7 +83,7 @@ func TestPublish(t *testing.T) { } func TestPublishWithClose(t *testing.T) { - rand.Seed(time.Now().UnixNano()) + seedPRNG(t) wqu := makeWorkQueue() client := &mockNetworkClient{} @@ -179,3 +184,13 @@ func randomBatch(min, max int, wqu workQueue) *Batch { func randIntBetween(min, max int) int { return rand.Intn(max-min) + min } + +func seedPRNG(t *testing.T) { + seed := *SeedFlag + if seed == 0 { + seed = time.Now().UnixNano() + } + + t.Logf("seeding PRNG with %v", seed) + rand.Seed(seed) +} From 0034f4b050820a1dfd6c54c7240f098c372f2169 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 3 Apr 2020 12:18:38 -0700 Subject: [PATCH 06/36] Cancel batch with netClient if it is closed --- libbeat/publisher/pipeline/output.go | 7 +++ libbeat/publisher/pipeline/output_test.go | 69 ++++++++++++++--------- 2 files changed, 49 insertions(+), 27 deletions(-) diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index 07b27f4d3f3..02ec2975db6 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -67,6 +67,13 @@ func (w *clientWorker) Close() error { func (w *clientWorker) run() { for !w.closed.Load() { for batch := range w.qu { + if w.closed.Load() { + if batch != nil { + batch.Cancelled() + } + return + } + w.observer.outBatchSend(len(batch.events)) if err := w.client.Publish(batch); err != nil { diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 0d4bd08e104..2e8388ade41 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -83,42 +83,57 @@ func TestPublish(t *testing.T) { } func TestPublishWithClose(t *testing.T) { - seedPRNG(t) + tests := map[string]struct { + client outputs.Client + }{ + "client": { + &mockClient{}, + }, + "network_client": { + &mockNetworkClient{}, + }, + } - wqu := makeWorkQueue() - client := &mockNetworkClient{} - worker := makeClientWorker(nilObserver, wqu, client) + for name, test := range tests { + t.Run(name, func(t *testing.T) { + seedPRNG(t) - numEvents := atomic.MakeInt(0) - var wg sync.WaitGroup - for batchIdx := 0; batchIdx <= randIntBetween(25, 200); batchIdx++ { - wg.Add(1) - go func() { - defer wg.Done() - batch := randomBatch(50, 150, wqu) + wqu := makeWorkQueue() + worker := makeClientWorker(nilObserver, wqu, test.client) - numEvents.Add(len(batch.Events())) + numEvents := atomic.MakeInt(0) + var wg sync.WaitGroup + for batchIdx := 0; batchIdx <= randIntBetween(25, 200); batchIdx++ { + wg.Add(1) + go func() { + defer wg.Done() + batch := randomBatch(50, 150, wqu) - wqu <- batch - }() - } + numEvents.Add(len(batch.Events())) - // Close worker before all batches have had time to be published - err := worker.Close() - require.NoError(t, err) + wqu <- batch + }() + } - remaining := numEvents.Load() - client.Published() - assert.Greater(t, remaining, 0) + // Close worker before all batches have had time to be published + err := worker.Close() + require.NoError(t, err) - // Start new worker to drain work queue - makeClientWorker(nilObserver, wqu, client) - wg.Wait() + c := test.client.(interface{ Published() int }) + remaining := numEvents.Load() - c.Published() + assert.Greater(t, remaining, 0) - // Give some time for events to be published - time.Sleep(time.Duration(remaining*3) * time.Microsecond) + // Start new worker to drain work queue + makeClientWorker(nilObserver, wqu, test.client) + wg.Wait() + + // Give some time for events to be published + time.Sleep(time.Duration(remaining*3) * time.Microsecond) - // Make sure that all events have eventually been published - assert.Equal(t, numEvents.Load(), client.Published()) + // Make sure that all events have eventually been published + assert.Equal(t, numEvents.Load(), c.Published()) + }) + } } type mockClient struct{ published int } From c0ee69e843edb9244d37955fc3cc15cb4e2e7651 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 3 Apr 2020 12:28:29 -0700 Subject: [PATCH 07/36] Use waitUntil loop instead of hard sleep --- libbeat/publisher/pipeline/output_test.go | 24 +++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 2e8388ade41..6b683c7d23a 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -73,11 +73,13 @@ func TestPublish(t *testing.T) { wg.Wait() // Give some time for events to be published - time.Sleep(time.Duration(numEvents.Load()*3) * time.Microsecond) + timeout := time.Duration(numEvents.Load()*3) * time.Microsecond // Make sure that all events have eventually been published c := test.client.(interface{ Published() int }) - assert.Equal(t, numEvents.Load(), c.Published()) + require.True(t, waitUntilTrue(timeout, func() bool { + return numEvents.Load() == c.Published() + })) }) } } @@ -128,10 +130,13 @@ func TestPublishWithClose(t *testing.T) { wg.Wait() // Give some time for events to be published - time.Sleep(time.Duration(remaining*3) * time.Microsecond) + timeout := time.Duration(remaining*3) * time.Microsecond // Make sure that all events have eventually been published - assert.Equal(t, numEvents.Load(), c.Published()) + require.True(t, waitUntilTrue(timeout, func() bool { + return numEvents.Load() == c.Published() + })) + }) } } @@ -209,3 +214,14 @@ func seedPRNG(t *testing.T) { t.Logf("seeding PRNG with %v", seed) rand.Seed(seed) } + +func waitUntilTrue(duration time.Duration, fn func() bool) bool { + end := time.Now().Add(duration) + for time.Now().Before(end) { + if fn() { + return true + } + time.Sleep(100 * time.Nanosecond) + } + return false +} From a950b19d2c7c63de703816f717e367ba5f772517 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 3 Apr 2020 13:47:14 -0700 Subject: [PATCH 08/36] Making mockClient threadsafe --- libbeat/publisher/pipeline/output_test.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 6b683c7d23a..aa92dd60c2f 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -141,13 +141,24 @@ func TestPublishWithClose(t *testing.T) { } } -type mockClient struct{ published int } +type mockClient struct { + mu sync.RWMutex + published int +} + +func (c *mockClient) Published() int { + c.mu.RLock() + defer c.mu.RUnlock() -func (c *mockClient) Published() int { return c.published } + return c.published +} func (c *mockClient) String() string { return "mock_client" } func (c *mockClient) Close() error { return nil } func (c *mockClient) Publish(batch publisher.Batch) error { + c.mu.Lock() + defer c.mu.Unlock() + c.published += len(batch.Events()) return nil } From 8018169662459a90265d22b9e3c63de02f265576 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 3 Apr 2020 14:04:21 -0700 Subject: [PATCH 09/36] Removing goroutine from happy path unit test --- libbeat/publisher/pipeline/output_test.go | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index aa92dd60c2f..cea7ef6c1df 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -58,19 +58,11 @@ func TestPublish(t *testing.T) { makeClientWorker(nilObserver, wqu, test.client) numEvents := atomic.MakeInt(0) - var wg sync.WaitGroup for batchIdx := 0; batchIdx <= randIntBetween(25, 200); batchIdx++ { - wg.Add(1) - go func() { - defer wg.Done() - batch := randomBatch(50, 150, wqu) - - numEvents.Add(len(batch.Events())) - - wqu <- batch - }() + batch := randomBatch(50, 150, wqu) + numEvents.Add(len(batch.Events())) + wqu <- batch } - wg.Wait() // Give some time for events to be published timeout := time.Duration(numEvents.Load()*3) * time.Microsecond From a92ec73f2527aaff3e4c1b9d19dd905487c12d85 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 3 Apr 2020 15:04:13 -0700 Subject: [PATCH 10/36] Using testing/quick --- libbeat/publisher/pipeline/output_test.go | 168 ++++++++++++---------- 1 file changed, 94 insertions(+), 74 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index cea7ef6c1df..e1507419e40 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -22,6 +22,7 @@ import ( "math/rand" "sync" "testing" + "testing/quick" "time" "github.com/stretchr/testify/assert" @@ -39,100 +40,106 @@ var ( ) func TestPublish(t *testing.T) { - tests := map[string]struct { - client outputs.Client - }{ - "client": { - &mockClient{}, - }, - "network_client": { - &mockNetworkClient{}, - }, + tests := map[string]func() outputs.Client{ + "client": newMockClient, + "network_client": newMockNetworkClient, } - for name, test := range tests { + for name, ctor := range tests { t.Run(name, func(t *testing.T) { seedPRNG(t) - wqu := makeWorkQueue() - makeClientWorker(nilObserver, wqu, test.client) + err := quick.Check(func(i uint) bool { + numBatches := 100 + (i % 200) // between 100 and 299 - numEvents := atomic.MakeInt(0) - for batchIdx := 0; batchIdx <= randIntBetween(25, 200); batchIdx++ { - batch := randomBatch(50, 150, wqu) - numEvents.Add(len(batch.Events())) - wqu <- batch - } + client := ctor() + wqu := makeWorkQueue() + makeClientWorker(nilObserver, wqu, client) - // Give some time for events to be published - timeout := time.Duration(numEvents.Load()*3) * time.Microsecond + numEvents := atomic.MakeInt(0) + for batchIdx := uint(0); batchIdx <= numBatches; batchIdx++ { + batch := randomBatch(50, 150, wqu) + numEvents.Add(len(batch.Events())) + wqu <- batch + } + + // Give some time for events to be published + timeout := time.Duration(numEvents.Load()*3) * time.Microsecond - // Make sure that all events have eventually been published - c := test.client.(interface{ Published() int }) - require.True(t, waitUntilTrue(timeout, func() bool { - return numEvents.Load() == c.Published() - })) + // Make sure that all events have eventually been published + c := client.(interface{ Published() int }) + return waitUntilTrue(timeout, func() bool { + return numEvents.Load() == c.Published() + }) + }, nil) + + if err != nil { + t.Error(err) + } }) } } func TestPublishWithClose(t *testing.T) { - tests := map[string]struct { - client outputs.Client - }{ - "client": { - &mockClient{}, - }, - "network_client": { - &mockNetworkClient{}, - }, + tests := map[string]func() outputs.Client{ + "client": newMockClient, + "network_client": newMockNetworkClient, } - for name, test := range tests { + for name, ctor := range tests { t.Run(name, func(t *testing.T) { seedPRNG(t) - wqu := makeWorkQueue() - worker := makeClientWorker(nilObserver, wqu, test.client) - - numEvents := atomic.MakeInt(0) - var wg sync.WaitGroup - for batchIdx := 0; batchIdx <= randIntBetween(25, 200); batchIdx++ { - wg.Add(1) - go func() { - defer wg.Done() - batch := randomBatch(50, 150, wqu) - - numEvents.Add(len(batch.Events())) - - wqu <- batch - }() + err := quick.Check(func(i uint) bool { + numBatches := 2000 + (i % 1000) // between 2000 and 2999 + + wqu := makeWorkQueue() + numEvents := atomic.MakeInt(0) + + var wg sync.WaitGroup + for batchIdx := uint(0); batchIdx <= numBatches; batchIdx++ { + wg.Add(1) + go func() { + defer wg.Done() + batch := randomBatch(50, 150, wqu) + numEvents.Add(len(batch.Events())) + wqu <- batch + }() + } + + client := ctor() + worker := makeClientWorker(nilObserver, wqu, client) + + // Close worker before all batches have had time to be published + err := worker.Close() + require.NoError(t, err) + + c := client.(interface{ Published() int }) + remaining := numEvents.Load() - c.Published() + assert.Greater(t, remaining, 0) + + // Start new worker to drain work queue + makeClientWorker(nilObserver, wqu, client) + wg.Wait() + + // Give some time for events to be published + timeout := time.Duration(remaining*3) * time.Microsecond + + // Make sure that all events have eventually been published + return waitUntilTrue(timeout, func() bool { + return numEvents.Load() == c.Published() + }) + }, nil) + + if err != nil { + t.Error(err) } - - // Close worker before all batches have had time to be published - err := worker.Close() - require.NoError(t, err) - - c := test.client.(interface{ Published() int }) - remaining := numEvents.Load() - c.Published() - assert.Greater(t, remaining, 0) - - // Start new worker to drain work queue - makeClientWorker(nilObserver, wqu, test.client) - wg.Wait() - - // Give some time for events to be published - timeout := time.Duration(remaining*3) * time.Microsecond - - // Make sure that all events have eventually been published - require.True(t, waitUntilTrue(timeout, func() bool { - return numEvents.Load() == c.Published() - })) - }) } } +func newMockClient() outputs.Client { return &mockClient{} } + type mockClient struct { mu sync.RWMutex published int @@ -155,13 +162,26 @@ func (c *mockClient) Publish(batch publisher.Batch) error { return nil } -type mockNetworkClient struct{ published int } +func newMockNetworkClient() outputs.Client { return &mockNetworkClient{} } + +type mockNetworkClient struct { + mu sync.RWMutex + published int +} + +func (c *mockNetworkClient) Published() int { + c.mu.RLock() + defer c.mu.RUnlock() -func (c *mockNetworkClient) Published() int { return c.published } + return c.published +} func (c *mockNetworkClient) String() string { return "mock_network_client" } func (c *mockNetworkClient) Close() error { return nil } func (c *mockNetworkClient) Publish(batch publisher.Batch) error { + c.mu.RLock() + defer c.mu.RUnlock() + c.published += len(batch.Events()) return nil } @@ -214,7 +234,7 @@ func seedPRNG(t *testing.T) { seed = time.Now().UnixNano() } - t.Logf("seeding PRNG with %v", seed) + t.Logf("reproduce test with `go test ... -seed %v`", seed) rand.Seed(seed) } From 29f7d234d65cc882d2c75f98c0981f5ad2e1cd72 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Sat, 4 Apr 2020 05:08:13 -0700 Subject: [PATCH 11/36] Increase batch sizes in tests --- libbeat/publisher/pipeline/output_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index e1507419e40..8e0ce78a6c9 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -50,7 +50,7 @@ func TestPublish(t *testing.T) { seedPRNG(t) err := quick.Check(func(i uint) bool { - numBatches := 100 + (i % 200) // between 100 and 299 + numBatches := 3000 + (i % 1000) // between 3000 and 3999 client := ctor() wqu := makeWorkQueue() @@ -91,7 +91,7 @@ func TestPublishWithClose(t *testing.T) { seedPRNG(t) err := quick.Check(func(i uint) bool { - numBatches := 2000 + (i % 1000) // between 2000 and 2999 + numBatches := 3000 + (i % 1000) // between 3000 and 3999 wqu := makeWorkQueue() numEvents := atomic.MakeInt(0) From 9fa686df737795be1d98ef5e65172c7e72b7430e Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Sun, 5 Apr 2020 05:16:43 -0700 Subject: [PATCH 12/36] Adding sleep to ensure some batches are still at time of close --- libbeat/publisher/pipeline/output_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 8e0ce78a6c9..ae5cdc4e515 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -52,8 +52,8 @@ func TestPublish(t *testing.T) { err := quick.Check(func(i uint) bool { numBatches := 3000 + (i % 1000) // between 3000 and 3999 - client := ctor() wqu := makeWorkQueue() + client := ctor() makeClientWorker(nilObserver, wqu, client) numEvents := atomic.MakeInt(0) @@ -94,6 +94,7 @@ func TestPublishWithClose(t *testing.T) { numBatches := 3000 + (i % 1000) // between 3000 and 3999 wqu := makeWorkQueue() + numEvents := atomic.MakeInt(0) var wg sync.WaitGroup @@ -101,9 +102,13 @@ func TestPublishWithClose(t *testing.T) { wg.Add(1) go func() { defer wg.Done() + batch := randomBatch(50, 150, wqu) numEvents.Add(len(batch.Events())) wqu <- batch + + // To ensure worker doesn't have time to publish all batches + time.Sleep(750 * time.Microsecond) }() } From 3b71cf14de1081da93aaaa0cfdad8705dd21a9b1 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Sun, 5 Apr 2020 05:55:43 -0700 Subject: [PATCH 13/36] Experiment witht with slihigher sleep time --- libbeat/publisher/pipeline/output_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index ae5cdc4e515..9943dc73796 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -108,7 +108,7 @@ func TestPublishWithClose(t *testing.T) { wqu <- batch // To ensure worker doesn't have time to publish all batches - time.Sleep(750 * time.Microsecond) + time.Sleep(1 * time.Millisecond) }() } From eb04fce9bd414df5316b86df023689fb8e5bf366 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Sun, 5 Apr 2020 06:05:42 -0700 Subject: [PATCH 14/36] Moving sleep to publish time --- libbeat/publisher/pipeline/output_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 9943dc73796..530882bbe59 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -106,9 +106,6 @@ func TestPublishWithClose(t *testing.T) { batch := randomBatch(50, 150, wqu) numEvents.Add(len(batch.Events())) wqu <- batch - - // To ensure worker doesn't have time to publish all batches - time.Sleep(1 * time.Millisecond) }() } @@ -143,6 +140,8 @@ func TestPublishWithClose(t *testing.T) { } } +const publishLatency = 1 * time.Microsecond + func newMockClient() outputs.Client { return &mockClient{} } type mockClient struct { @@ -163,6 +162,7 @@ func (c *mockClient) Publish(batch publisher.Batch) error { c.mu.Lock() defer c.mu.Unlock() + time.Sleep(publishLatency) c.published += len(batch.Events()) return nil } @@ -187,6 +187,7 @@ func (c *mockNetworkClient) Publish(batch publisher.Batch) error { c.mu.RLock() defer c.mu.RUnlock() + time.Sleep(publishLatency) c.published += len(batch.Events()) return nil } From 4f14aec8de46c413fa95b6dd21660cc804b35011 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Sun, 5 Apr 2020 06:41:44 -0700 Subject: [PATCH 15/36] Increase publish latency --- libbeat/publisher/pipeline/output_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 530882bbe59..a02d51f9570 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -140,7 +140,7 @@ func TestPublishWithClose(t *testing.T) { } } -const publishLatency = 1 * time.Microsecond +const publishLatency = 5 * time.Microsecond func newMockClient() outputs.Client { return &mockClient{} } From c0de546e79885057f8cfbbaab08a4a4df2d75e70 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Sun, 5 Apr 2020 16:56:20 -0700 Subject: [PATCH 16/36] Increasing publish latency again --- libbeat/publisher/pipeline/output_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index a02d51f9570..4c00d98538e 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -140,7 +140,7 @@ func TestPublishWithClose(t *testing.T) { } } -const publishLatency = 5 * time.Microsecond +const publishLatency = 50 * time.Microsecond func newMockClient() outputs.Client { return &mockClient{} } From c2a0deccfe8a81efe4aa1f8fec1f4c669e73c4da Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Sun, 5 Apr 2020 17:31:52 -0700 Subject: [PATCH 17/36] Removing publishLatency --- libbeat/publisher/pipeline/output_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 4c00d98538e..f5a4957e970 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -140,8 +140,6 @@ func TestPublishWithClose(t *testing.T) { } } -const publishLatency = 50 * time.Microsecond - func newMockClient() outputs.Client { return &mockClient{} } type mockClient struct { @@ -162,7 +160,6 @@ func (c *mockClient) Publish(batch publisher.Batch) error { c.mu.Lock() defer c.mu.Unlock() - time.Sleep(publishLatency) c.published += len(batch.Events()) return nil } @@ -187,7 +184,6 @@ func (c *mockNetworkClient) Publish(batch publisher.Batch) error { c.mu.RLock() defer c.mu.RUnlock() - time.Sleep(publishLatency) c.published += len(batch.Events()) return nil } From 2352fc36d3d3dbf23260bc9d4cf5710e66565279 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 6 Apr 2020 16:30:57 -0700 Subject: [PATCH 18/36] Fix timeout to large value --- libbeat/publisher/pipeline/output_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index f5a4957e970..69b0e08c4cb 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -64,7 +64,7 @@ func TestPublish(t *testing.T) { } // Give some time for events to be published - timeout := time.Duration(numEvents.Load()*3) * time.Microsecond + timeout := 20 * time.Second // Make sure that all events have eventually been published c := client.(interface{ Published() int }) @@ -125,7 +125,7 @@ func TestPublishWithClose(t *testing.T) { wg.Wait() // Give some time for events to be published - timeout := time.Duration(remaining*3) * time.Microsecond + timeout := 20 * time.Second // Make sure that all events have eventually been published return waitUntilTrue(timeout, func() bool { From ee896ebf899e9c76cf0b692d0d39f4058916a32a Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 6 Apr 2020 17:03:48 -0700 Subject: [PATCH 19/36] Make first client block after publishing X events --- libbeat/publisher/pipeline/output_test.go | 61 +++++++++++++++-------- 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 69b0e08c4cb..aff78cfa731 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -40,7 +40,7 @@ var ( ) func TestPublish(t *testing.T) { - tests := map[string]func() outputs.Client{ + tests := map[string]func(uint) outputs.Client{ "client": newMockClient, "network_client": newMockNetworkClient, } @@ -53,7 +53,7 @@ func TestPublish(t *testing.T) { numBatches := 3000 + (i % 1000) // between 3000 and 3999 wqu := makeWorkQueue() - client := ctor() + client := ctor(0) makeClientWorker(nilObserver, wqu, client) numEvents := atomic.MakeInt(0) @@ -81,7 +81,7 @@ func TestPublish(t *testing.T) { } func TestPublishWithClose(t *testing.T) { - tests := map[string]func() outputs.Client{ + tests := map[string]func(uint) outputs.Client{ "client": newMockClient, "network_client": newMockNetworkClient, } @@ -94,8 +94,7 @@ func TestPublishWithClose(t *testing.T) { numBatches := 3000 + (i % 1000) // between 3000 and 3999 wqu := makeWorkQueue() - - numEvents := atomic.MakeInt(0) + numEvents := atomic.MakeUint(0) var wg sync.WaitGroup for batchIdx := uint(0); batchIdx <= numBatches; batchIdx++ { @@ -104,23 +103,24 @@ func TestPublishWithClose(t *testing.T) { defer wg.Done() batch := randomBatch(50, 150, wqu) - numEvents.Add(len(batch.Events())) + numEvents.Add(uint(len(batch.Events()))) wqu <- batch }() } - client := ctor() + client := ctor(numEvents.Load() / 2) // Stop short of publishing all events worker := makeClientWorker(nilObserver, wqu, client) // Close worker before all batches have had time to be published err := worker.Close() require.NoError(t, err) - c := client.(interface{ Published() int }) - remaining := numEvents.Load() - c.Published() - assert.Greater(t, remaining, 0) + c := client.(interface{ Published() uint }) + published := c.Published() + assert.Less(t, published, numEvents.Load()) // Start new worker to drain work queue + client = ctor(0) makeClientWorker(nilObserver, wqu, client) wg.Wait() @@ -129,7 +129,8 @@ func TestPublishWithClose(t *testing.T) { // Make sure that all events have eventually been published return waitUntilTrue(timeout, func() bool { - return numEvents.Load() == c.Published() + c = client.(interface{ Published() uint }) + return numEvents.Load() == c.Published()+published }) }, nil) @@ -140,14 +141,17 @@ func TestPublishWithClose(t *testing.T) { } } -func newMockClient() outputs.Client { return &mockClient{} } +func newMockClient(publishLimit uint) outputs.Client { + return &mockClient{publishLimit: publishLimit} +} type mockClient struct { - mu sync.RWMutex - published int + mu sync.RWMutex + publishLimit uint + published uint } -func (c *mockClient) Published() int { +func (c *mockClient) Published() uint { c.mu.RLock() defer c.mu.RUnlock() @@ -160,18 +164,27 @@ func (c *mockClient) Publish(batch publisher.Batch) error { c.mu.Lock() defer c.mu.Unlock() - c.published += len(batch.Events()) + // Simulate blocking (not publishing) + if c.publishLimit > 0 && c.published >= c.publishLimit { + batch.Cancelled() + return nil + } + + c.published += uint(len(batch.Events())) return nil } -func newMockNetworkClient() outputs.Client { return &mockNetworkClient{} } +func newMockNetworkClient(publishLimit uint) outputs.Client { + return &mockNetworkClient{publishLimit: publishLimit} +} type mockNetworkClient struct { - mu sync.RWMutex - published int + mu sync.RWMutex + publishLimit uint + published uint } -func (c *mockNetworkClient) Published() int { +func (c *mockNetworkClient) Published() uint { c.mu.RLock() defer c.mu.RUnlock() @@ -184,7 +197,13 @@ func (c *mockNetworkClient) Publish(batch publisher.Batch) error { c.mu.RLock() defer c.mu.RUnlock() - c.published += len(batch.Events()) + // Simulate blocking (not publishing) + if c.publishLimit > 0 && c.published >= c.publishLimit { + batch.Cancelled() + return nil + } + + c.published += uint(len(batch.Events())) return nil } func (c *mockNetworkClient) Connect() error { return nil } From 06985d69fc212772c780733d2d5f558fc31a332e Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 6 Apr 2020 18:31:50 -0700 Subject: [PATCH 20/36] Actually block publishing --- libbeat/publisher/pipeline/output_test.go | 58 +++++++---------------- 1 file changed, 18 insertions(+), 40 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index aff78cfa731..dfb20af4bf9 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -40,7 +40,7 @@ var ( ) func TestPublish(t *testing.T) { - tests := map[string]func(uint) outputs.Client{ + tests := map[string]func(uint) publishCountable{ "client": newMockClient, "network_client": newMockNetworkClient, } @@ -56,10 +56,10 @@ func TestPublish(t *testing.T) { client := ctor(0) makeClientWorker(nilObserver, wqu, client) - numEvents := atomic.MakeInt(0) + numEvents := atomic.MakeUint(0) for batchIdx := uint(0); batchIdx <= numBatches; batchIdx++ { batch := randomBatch(50, 150, wqu) - numEvents.Add(len(batch.Events())) + numEvents.Add(uint(len(batch.Events()))) wqu <- batch } @@ -67,9 +67,8 @@ func TestPublish(t *testing.T) { timeout := 20 * time.Second // Make sure that all events have eventually been published - c := client.(interface{ Published() int }) return waitUntilTrue(timeout, func() bool { - return numEvents.Load() == c.Published() + return numEvents.Load() == client.Published() }) }, nil) @@ -81,7 +80,7 @@ func TestPublish(t *testing.T) { } func TestPublishWithClose(t *testing.T) { - tests := map[string]func(uint) outputs.Client{ + tests := map[string]func(uint) publishCountable{ "client": newMockClient, "network_client": newMockNetworkClient, } @@ -115,8 +114,7 @@ func TestPublishWithClose(t *testing.T) { err := worker.Close() require.NoError(t, err) - c := client.(interface{ Published() uint }) - published := c.Published() + published := client.Published() assert.Less(t, published, numEvents.Load()) // Start new worker to drain work queue @@ -129,8 +127,7 @@ func TestPublishWithClose(t *testing.T) { // Make sure that all events have eventually been published return waitUntilTrue(timeout, func() bool { - c = client.(interface{ Published() uint }) - return numEvents.Load() == c.Published()+published + return numEvents.Load() == client.Published()+published }) }, nil) @@ -141,7 +138,12 @@ func TestPublishWithClose(t *testing.T) { } } -func newMockClient(publishLimit uint) outputs.Client { +type publishCountable interface { + outputs.Client + Published() uint +} + +func newMockClient(publishLimit uint) publishCountable { return &mockClient{publishLimit: publishLimit} } @@ -164,9 +166,9 @@ func (c *mockClient) Publish(batch publisher.Batch) error { c.mu.Lock() defer c.mu.Unlock() - // Simulate blocking (not publishing) + // Block publishing if c.publishLimit > 0 && c.published >= c.publishLimit { - batch.Cancelled() + time.Sleep(10 * time.Second) return nil } @@ -174,38 +176,14 @@ func (c *mockClient) Publish(batch publisher.Batch) error { return nil } -func newMockNetworkClient(publishLimit uint) outputs.Client { - return &mockNetworkClient{publishLimit: publishLimit} +func newMockNetworkClient(publishLimit uint) publishCountable { + return &mockNetworkClient{newMockClient(publishLimit)} } type mockNetworkClient struct { - mu sync.RWMutex - publishLimit uint - published uint -} - -func (c *mockNetworkClient) Published() uint { - c.mu.RLock() - defer c.mu.RUnlock() - - return c.published + publishCountable } -func (c *mockNetworkClient) String() string { return "mock_network_client" } -func (c *mockNetworkClient) Close() error { return nil } -func (c *mockNetworkClient) Publish(batch publisher.Batch) error { - c.mu.RLock() - defer c.mu.RUnlock() - - // Simulate blocking (not publishing) - if c.publishLimit > 0 && c.published >= c.publishLimit { - batch.Cancelled() - return nil - } - - c.published += uint(len(batch.Events())) - return nil -} func (c *mockNetworkClient) Connect() error { return nil } type mockQueue struct{} From 66942f15d13f7831b584727c9c02b1f056bcc51f Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 6 Apr 2020 19:11:58 -0700 Subject: [PATCH 21/36] Reduce number of batches to prevent running out of memory --- libbeat/publisher/pipeline/output_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index dfb20af4bf9..cb2c4f9eacf 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -50,7 +50,7 @@ func TestPublish(t *testing.T) { seedPRNG(t) err := quick.Check(func(i uint) bool { - numBatches := 3000 + (i % 1000) // between 3000 and 3999 + numBatches := 300 + (i % 100) // between 300 and 399 wqu := makeWorkQueue() client := ctor(0) @@ -90,7 +90,7 @@ func TestPublishWithClose(t *testing.T) { seedPRNG(t) err := quick.Check(func(i uint) bool { - numBatches := 3000 + (i % 1000) // between 3000 and 3999 + numBatches := 300 + (i % 100) // between 300 and 399 wqu := makeWorkQueue() numEvents := atomic.MakeUint(0) From d936aee8cdf830535e9170e3d863d1af11aa64ac Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 6 Apr 2020 19:50:48 -0700 Subject: [PATCH 22/36] Bumping up # of batches --- libbeat/publisher/pipeline/output_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index cb2c4f9eacf..2fa908e7bf5 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -90,7 +90,7 @@ func TestPublishWithClose(t *testing.T) { seedPRNG(t) err := quick.Check(func(i uint) bool { - numBatches := 300 + (i % 100) // between 300 and 399 + numBatches := 600 + (i % 100) // between 600 and 699 wqu := makeWorkQueue() numEvents := atomic.MakeUint(0) From 1813f124c9318c6fc96e56bbc79a9998c00e3c7c Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 6 Apr 2020 23:56:41 -0700 Subject: [PATCH 23/36] Bumping up # of batches again --- libbeat/publisher/pipeline/output_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 2fa908e7bf5..9cbd494c09f 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -90,7 +90,7 @@ func TestPublishWithClose(t *testing.T) { seedPRNG(t) err := quick.Check(func(i uint) bool { - numBatches := 600 + (i % 100) // between 600 and 699 + numBatches := 1000 + (i % 100) // between 1000 and 1099 wqu := makeWorkQueue() numEvents := atomic.MakeUint(0) From de3d3f6ecd288079e7719be1adb16e8530337e5f Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 7 Apr 2020 00:24:59 -0700 Subject: [PATCH 24/36] Try different strategy - publish 80% of events --- libbeat/publisher/pipeline/output_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 9cbd494c09f..6144ce1e17c 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -19,6 +19,7 @@ package pipeline import ( "flag" + "math" "math/rand" "sync" "testing" @@ -107,7 +108,7 @@ func TestPublishWithClose(t *testing.T) { }() } - client := ctor(numEvents.Load() / 2) // Stop short of publishing all events + client := ctor(uint(math.Floor(float64(numEvents.Load()) * 0.8))) // Stop short of publishing all events worker := makeClientWorker(nilObserver, wqu, client) // Close worker before all batches have had time to be published From e8cde93a451200a32baa4e1faa823cd962ace680 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 7 Apr 2020 03:19:02 -0700 Subject: [PATCH 25/36] Cranking up sleep time in publish blocking --- libbeat/publisher/pipeline/output_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 6144ce1e17c..bc252047a11 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -169,7 +169,7 @@ func (c *mockClient) Publish(batch publisher.Batch) error { // Block publishing if c.publishLimit > 0 && c.published >= c.publishLimit { - time.Sleep(10 * time.Second) + time.Sleep(30 * time.Second) return nil } From 6f89e195cfa4a01e01ab5996ded34853f7c74f17 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 7 Apr 2020 04:17:26 -0700 Subject: [PATCH 26/36] Only publish first 20% of events --- libbeat/publisher/pipeline/output_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index bc252047a11..08261eb5393 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -108,7 +108,8 @@ func TestPublishWithClose(t *testing.T) { }() } - client := ctor(uint(math.Floor(float64(numEvents.Load()) * 0.8))) // Stop short of publishing all events + publishLimit := math.Floor(float64(numEvents.Load()) * 0.2) // Only publish first 20% of events + client := ctor(uint(publishLimit)) worker := makeClientWorker(nilObserver, wqu, client) // Close worker before all batches have had time to be published From 915e6ce63636cd1d0de5d5ebb9e117fb5cb61be4 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 7 Apr 2020 05:08:23 -0700 Subject: [PATCH 27/36] Make sure to return batch for retrying --- libbeat/publisher/pipeline/output_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 08261eb5393..b0338b92754 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -108,7 +108,7 @@ func TestPublishWithClose(t *testing.T) { }() } - publishLimit := math.Floor(float64(numEvents.Load()) * 0.2) // Only publish first 20% of events + publishLimit := math.Floor(float64(numEvents.Load()) * 0.2) // Only publish up to first 20% of events client := ctor(uint(publishLimit)) worker := makeClientWorker(nilObserver, wqu, client) @@ -124,12 +124,11 @@ func TestPublishWithClose(t *testing.T) { makeClientWorker(nilObserver, wqu, client) wg.Wait() - // Give some time for events to be published - timeout := 20 * time.Second - // Make sure that all events have eventually been published + timeout := 20 * time.Second return waitUntilTrue(timeout, func() bool { - return numEvents.Load() == client.Published()+published + total := published + client.Published() + return numEvents.Load() == total }) }, nil) @@ -170,7 +169,8 @@ func (c *mockClient) Publish(batch publisher.Batch) error { // Block publishing if c.publishLimit > 0 && c.published >= c.publishLimit { - time.Sleep(30 * time.Second) + batch.Retry() // to simulate not acking + time.Sleep(10 * time.Second) // block long enough for test return nil } From 8bb19eecaaa7c7c488cb55cc57400037f98898ff Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 7 Apr 2020 08:57:30 -0700 Subject: [PATCH 28/36] Adding debugging statements to see what's happening in Travis --- libbeat/publisher/pipeline/output_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index b0338b92754..07728f41be0 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -19,6 +19,7 @@ package pipeline import ( "flag" + "fmt" "math" "math/rand" "sync" @@ -109,6 +110,7 @@ func TestPublishWithClose(t *testing.T) { } publishLimit := math.Floor(float64(numEvents.Load()) * 0.2) // Only publish up to first 20% of events + fmt.Printf("[%v] publishLimit = %v\n", name, publishLimit) client := ctor(uint(publishLimit)) worker := makeClientWorker(nilObserver, wqu, client) @@ -117,6 +119,7 @@ func TestPublishWithClose(t *testing.T) { require.NoError(t, err) published := client.Published() + fmt.Printf("[%v] published = %v, total = %v\n", name, published, numEvents.Load()) assert.Less(t, published, numEvents.Load()) // Start new worker to drain work queue From 57b527ba7f71b1ea2c1f896b55392fb37a1b7f0e Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 7 Apr 2020 09:29:26 -0700 Subject: [PATCH 29/36] More robust to race conditions --- libbeat/publisher/pipeline/output_test.go | 30 ++++++++++++++--------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 07728f41be0..cf892d9aa31 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -19,7 +19,6 @@ package pipeline import ( "flag" - "fmt" "math" "math/rand" "sync" @@ -27,7 +26,6 @@ import ( "testing/quick" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/common/atomic" @@ -87,6 +85,8 @@ func TestPublishWithClose(t *testing.T) { "network_client": newMockNetworkClient, } + const minEventsInBatch = 50 + for name, ctor := range tests { t.Run(name, func(t *testing.T) { seedPRNG(t) @@ -103,24 +103,31 @@ func TestPublishWithClose(t *testing.T) { go func() { defer wg.Done() - batch := randomBatch(50, 150, wqu) + batch := randomBatch(minEventsInBatch, 150, wqu) numEvents.Add(uint(len(batch.Events()))) wqu <- batch }() } - publishLimit := math.Floor(float64(numEvents.Load()) * 0.2) // Only publish up to first 20% of events - fmt.Printf("[%v] publishLimit = %v\n", name, publishLimit) - client := ctor(uint(publishLimit)) + // Publish at least 1 batch worth of events but no more than 20% events + publishLimit := uint(math.Min(minEventsInBatch, float64(numEvents.Load())*0.2)) + client := ctor(publishLimit) worker := makeClientWorker(nilObserver, wqu, client) + // Allow the worker to make *some* progress before we close it + timeout := 10 * time.Second + progress := waitUntilTrue(timeout, func() bool { + return client.Published() >= publishLimit + }) + if !progress { + return false + } + // Close worker before all batches have had time to be published err := worker.Close() require.NoError(t, err) published := client.Published() - fmt.Printf("[%v] published = %v, total = %v\n", name, published, numEvents.Load()) - assert.Less(t, published, numEvents.Load()) // Start new worker to drain work queue client = ctor(0) @@ -128,7 +135,7 @@ func TestPublishWithClose(t *testing.T) { wg.Wait() // Make sure that all events have eventually been published - timeout := 20 * time.Second + timeout = 20 * time.Second return waitUntilTrue(timeout, func() bool { total := published + client.Published() return numEvents.Load() == total @@ -172,8 +179,7 @@ func (c *mockClient) Publish(batch publisher.Batch) error { // Block publishing if c.publishLimit > 0 && c.published >= c.publishLimit { - batch.Retry() // to simulate not acking - time.Sleep(10 * time.Second) // block long enough for test + batch.Retry() // to simulate not acking return nil } @@ -248,7 +254,7 @@ func waitUntilTrue(duration time.Duration, fn func() bool) bool { if fn() { return true } - time.Sleep(100 * time.Nanosecond) + time.Sleep(1 * time.Millisecond) } return false } From 310dc4a350dc13c77a620a0b3594850466976c1b Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 7 Apr 2020 10:29:40 -0700 Subject: [PATCH 30/36] Restricting quick iterations to 50 to see if that helps in Travis --- libbeat/publisher/pipeline/output_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index cf892d9aa31..320d8a15d8c 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -140,7 +140,7 @@ func TestPublishWithClose(t *testing.T) { total := published + client.Published() return numEvents.Load() == total }) - }, nil) + }, &quick.Config{MaxCount: 50}) if err != nil { t.Error(err) From 94f3445114fa98b276625092167dbb1d2c2a4f52 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 7 Apr 2020 15:04:34 -0700 Subject: [PATCH 31/36] Put entire loop into goroutine --- libbeat/publisher/pipeline/output_test.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 320d8a15d8c..0dc678ef82c 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -98,16 +98,15 @@ func TestPublishWithClose(t *testing.T) { numEvents := atomic.MakeUint(0) var wg sync.WaitGroup - for batchIdx := uint(0); batchIdx <= numBatches; batchIdx++ { - wg.Add(1) - go func() { - defer wg.Done() - + wg.Add(1) + go func() { + defer wg.Done() + for batchIdx := uint(0); batchIdx <= numBatches; batchIdx++ { batch := randomBatch(minEventsInBatch, 150, wqu) numEvents.Add(uint(len(batch.Events()))) wqu <- batch - }() - } + } + }() // Publish at least 1 batch worth of events but no more than 20% events publishLimit := uint(math.Min(minEventsInBatch, float64(numEvents.Load())*0.2)) From be4bf09cc58c977dc534507c5c8f472d0b800e02 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 7 Apr 2020 15:07:21 -0700 Subject: [PATCH 32/36] Renaming tests --- libbeat/publisher/pipeline/output_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 0dc678ef82c..2d6d3352d3f 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -39,7 +39,7 @@ var ( SeedFlag = flag.Int64("seed", 0, "Randomization seed") ) -func TestPublish(t *testing.T) { +func TestMakeClientWorker(t *testing.T) { tests := map[string]func(uint) publishCountable{ "client": newMockClient, "network_client": newMockNetworkClient, @@ -79,7 +79,7 @@ func TestPublish(t *testing.T) { } } -func TestPublishWithClose(t *testing.T) { +func TestMakeClientWorkerAndClose(t *testing.T) { tests := map[string]func(uint) publishCountable{ "client": newMockClient, "network_client": newMockNetworkClient, From 508b60687383f5cbedbbe8c7a7686f8082844a8a Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 7 Apr 2020 16:35:36 -0700 Subject: [PATCH 33/36] Emulate blocking + mockable publish behavior --- libbeat/publisher/pipeline/output_test.go | 87 ++++++++++++----------- 1 file changed, 45 insertions(+), 42 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 2d6d3352d3f..536dec97bf0 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -40,7 +40,7 @@ var ( ) func TestMakeClientWorker(t *testing.T) { - tests := map[string]func(uint) publishCountable{ + tests := map[string]func(mockPublishFn) outputs.Client{ "client": newMockClient, "network_client": newMockNetworkClient, } @@ -52,8 +52,14 @@ func TestMakeClientWorker(t *testing.T) { err := quick.Check(func(i uint) bool { numBatches := 300 + (i % 100) // between 300 and 399 + var published atomic.Uint + publishFn := func(batch publisher.Batch) error { + published.Add(uint(len(batch.Events()))) + return nil + } + wqu := makeWorkQueue() - client := ctor(0) + client := ctor(publishFn) makeClientWorker(nilObserver, wqu, client) numEvents := atomic.MakeUint(0) @@ -68,7 +74,7 @@ func TestMakeClientWorker(t *testing.T) { // Make sure that all events have eventually been published return waitUntilTrue(timeout, func() bool { - return numEvents.Load() == client.Published() + return numEvents == published }) }, nil) @@ -80,7 +86,7 @@ func TestMakeClientWorker(t *testing.T) { } func TestMakeClientWorkerAndClose(t *testing.T) { - tests := map[string]func(uint) publishCountable{ + tests := map[string]func(mockPublishFn) outputs.Client{ "client": newMockClient, "network_client": newMockNetworkClient, } @@ -109,14 +115,29 @@ func TestMakeClientWorkerAndClose(t *testing.T) { }() // Publish at least 1 batch worth of events but no more than 20% events - publishLimit := uint(math.Min(minEventsInBatch, float64(numEvents.Load())*0.2)) - client := ctor(publishLimit) + publishLimit := uint(math.Max(minEventsInBatch, float64(numEvents.Load())*0.2)) + + var publishedFirst atomic.Uint + blockCtrl := make(chan struct{}) + blockingPublishFn := func(batch publisher.Batch) error { + // Emulate blocking + if publishedFirst.Load() >= publishLimit { + batch.Retry() + <-blockCtrl + return nil + } + + publishedFirst.Add(uint(len(batch.Events()))) + return nil + } + + client := ctor(blockingPublishFn) worker := makeClientWorker(nilObserver, wqu, client) // Allow the worker to make *some* progress before we close it timeout := 10 * time.Second progress := waitUntilTrue(timeout, func() bool { - return client.Published() >= publishLimit + return publishedFirst.Load() >= publishLimit }) if !progress { return false @@ -125,19 +146,23 @@ func TestMakeClientWorkerAndClose(t *testing.T) { // Close worker before all batches have had time to be published err := worker.Close() require.NoError(t, err) - - published := client.Published() + close(blockCtrl) // Start new worker to drain work queue - client = ctor(0) + var publishedLater atomic.Uint + countingPublishFn := func(batch publisher.Batch) error { + publishedLater.Add(uint(len(batch.Events()))) + return nil + } + + client = ctor(countingPublishFn) makeClientWorker(nilObserver, wqu, client) wg.Wait() // Make sure that all events have eventually been published timeout = 20 * time.Second return waitUntilTrue(timeout, func() bool { - total := published + client.Published() - return numEvents.Load() == total + return numEvents.Load() == publishedFirst.Load()+publishedLater.Load() }) }, &quick.Config{MaxCount: 50}) @@ -148,50 +173,28 @@ func TestMakeClientWorkerAndClose(t *testing.T) { } } -type publishCountable interface { - outputs.Client - Published() uint -} +type mockPublishFn func(publisher.Batch) error -func newMockClient(publishLimit uint) publishCountable { - return &mockClient{publishLimit: publishLimit} +func newMockClient(publishFn mockPublishFn) outputs.Client { + return &mockClient{publishFn: publishFn} } type mockClient struct { - mu sync.RWMutex - publishLimit uint - published uint -} - -func (c *mockClient) Published() uint { - c.mu.RLock() - defer c.mu.RUnlock() - - return c.published + publishFn mockPublishFn } func (c *mockClient) String() string { return "mock_client" } func (c *mockClient) Close() error { return nil } func (c *mockClient) Publish(batch publisher.Batch) error { - c.mu.Lock() - defer c.mu.Unlock() - - // Block publishing - if c.publishLimit > 0 && c.published >= c.publishLimit { - batch.Retry() // to simulate not acking - return nil - } - - c.published += uint(len(batch.Events())) - return nil + return c.publishFn(batch) } -func newMockNetworkClient(publishLimit uint) publishCountable { - return &mockNetworkClient{newMockClient(publishLimit)} +func newMockNetworkClient(publishFn mockPublishFn) outputs.Client { + return &mockNetworkClient{newMockClient(publishFn)} } type mockNetworkClient struct { - publishCountable + outputs.Client } func (c *mockNetworkClient) Connect() error { return nil } From 457434f4fbc3171ceb21c9d7bcb83579831b579e Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 8 Apr 2020 04:07:15 -0700 Subject: [PATCH 34/36] Removing retry and return --- libbeat/publisher/pipeline/output_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 536dec97bf0..95b269e45c2 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -122,9 +122,7 @@ func TestMakeClientWorkerAndClose(t *testing.T) { blockingPublishFn := func(batch publisher.Batch) error { // Emulate blocking if publishedFirst.Load() >= publishLimit { - batch.Retry() <-blockCtrl - return nil } publishedFirst.Add(uint(len(batch.Events()))) From 3b3de96507e855515c859614f6ec31377d4ca256 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 8 Apr 2020 04:13:39 -0700 Subject: [PATCH 35/36] Clarify intent with comment --- libbeat/publisher/pipeline/output_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 95b269e45c2..2a23fb16975 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -120,7 +120,8 @@ func TestMakeClientWorkerAndClose(t *testing.T) { var publishedFirst atomic.Uint blockCtrl := make(chan struct{}) blockingPublishFn := func(batch publisher.Batch) error { - // Emulate blocking + // Emulate blocking. Upon unblocking the in-flight batch that was + // blocked is published. if publishedFirst.Load() >= publishLimit { <-blockCtrl } From 6a8793f2164ab339e93126e7894b98f0a66a1846 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 8 Apr 2020 13:37:56 -0700 Subject: [PATCH 36/36] Setting # of quick iterations to 25 --- libbeat/publisher/pipeline/output_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 2a23fb16975..d89c166ee15 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -163,7 +163,7 @@ func TestMakeClientWorkerAndClose(t *testing.T) { return waitUntilTrue(timeout, func() bool { return numEvents.Load() == publishedFirst.Load()+publishedLater.Load() }) - }, &quick.Config{MaxCount: 50}) + }, &quick.Config{MaxCount: 25}) if err != nil { t.Error(err)