From c6673ad0dc26c0754be78250bf30a7e4655b9da0 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Mon, 27 Mar 2023 20:21:36 +0100 Subject: [PATCH 1/4] Fix intermittent CI failure in EmptyQueue The ordering of the final token causing a close of the queue in this test may be out of sync due to concurrency. Instead just use ensure that the queue is closed when everything expected is done. Signed-off-by: Andrew Thornton --- .../queue/unique_queue_disk_channel_test.go | 46 ++++++++++--------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/modules/queue/unique_queue_disk_channel_test.go b/modules/queue/unique_queue_disk_channel_test.go index fd76163f4aaca..8653d876a26fb 100644 --- a/modules/queue/unique_queue_disk_channel_test.go +++ b/modules/queue/unique_queue_disk_channel_test.go @@ -7,6 +7,7 @@ import ( "fmt" "strconv" "sync" + "sync/atomic" "testing" "time" @@ -97,7 +98,7 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { executedInitial := map[string][]string{} hasInitial := map[string][]string{} - fillQueue := func(name string, done chan struct{}) { + fillQueue := func(name string, done chan int) { t.Run("Initial Filling: "+name, func(t *testing.T) { lock := sync.Mutex{} @@ -154,21 +155,22 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { assert.Equal(t, 101, len(executedInitial[name])+len(hasInitial[name])) mapLock.Unlock() }) + done <- len(hasInitial[name]) close(done) } - doneA := make(chan struct{}) - doneB := make(chan struct{}) + hasQueueAChan := make(chan int) + hasQueueBChan := make(chan int) - go fillQueue("QueueA", doneA) - go fillQueue("QueueB", doneB) + go fillQueue("QueueA", hasQueueAChan) + go fillQueue("QueueB", hasQueueBChan) - <-doneA - <-doneB + hasA := <-hasQueueAChan + hasB := <-hasQueueBChan executedEmpty := map[string][]string{} hasEmpty := map[string][]string{} - emptyQueue := func(name string, done chan struct{}) { + emptyQueue := func(name string, numInQueue int64, done chan struct{}) { t.Run("Empty Queue: "+name, func(t *testing.T) { lock := sync.Mutex{} stop := make(chan struct{}) @@ -176,11 +178,13 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { // collect the tasks that have been executed handle := func(data ...Data) []Data { lock.Lock() + i := int64(0) for _, datum := range data { mapLock.Lock() executedEmpty[name] = append(executedEmpty[name], datum.(string)) mapLock.Unlock() - if datum.(string) == "final" { + count := atomic.AddInt64(&i, 1) + if count >= numInQueue { close(stop) } } @@ -214,11 +218,11 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { close(done) } - doneA = make(chan struct{}) - doneB = make(chan struct{}) + doneA := make(chan struct{}) + doneB := make(chan struct{}) - go emptyQueue("QueueA", doneA) - go emptyQueue("QueueB", doneB) + go emptyQueue("QueueA", int64(hasA), doneA) + go emptyQueue("QueueB", int64(hasB), doneB) <-doneA <-doneB @@ -234,20 +238,20 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { hasEmpty = map[string][]string{} mapLock.Unlock() - doneA = make(chan struct{}) - doneB = make(chan struct{}) + hasQueueAChan = make(chan int) + hasQueueBChan = make(chan int) - go fillQueue("QueueA", doneA) - go fillQueue("QueueB", doneB) + go fillQueue("QueueA", hasQueueAChan) + go fillQueue("QueueB", hasQueueBChan) - <-doneA - <-doneB + hasA = <-hasQueueAChan + hasB = <-hasQueueBChan doneA = make(chan struct{}) doneB = make(chan struct{}) - go emptyQueue("QueueA", doneA) - go emptyQueue("QueueB", doneB) + go emptyQueue("QueueA", int64(hasA), doneA) + go emptyQueue("QueueB", int64(hasB), doneB) <-doneA <-doneB From 80cd8bea7bd8acf2fea9f609e1bee328d96b6240 Mon Sep 17 00:00:00 2001 From: zeripath Date: Tue, 28 Mar 2023 07:36:45 +0100 Subject: [PATCH 2/4] Update modules/queue/unique_queue_disk_channel_test.go Co-authored-by: KN4CK3R --- modules/queue/unique_queue_disk_channel_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/queue/unique_queue_disk_channel_test.go b/modules/queue/unique_queue_disk_channel_test.go index 8653d876a26fb..0be8f2bb88d4c 100644 --- a/modules/queue/unique_queue_disk_channel_test.go +++ b/modules/queue/unique_queue_disk_channel_test.go @@ -155,7 +155,9 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { assert.Equal(t, 101, len(executedInitial[name])+len(hasInitial[name])) mapLock.Unlock() }) + mapLock.Lock() done <- len(hasInitial[name]) + mapLock.Unlock() close(done) } From 3cf4224aca7511d1096b359c18e880ee4a4510a1 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Wed, 3 May 2023 21:07:10 +0100 Subject: [PATCH 3/4] fix tests again Signed-off-by: Andrew Thornton --- .../queue/unique_queue_disk_channel_test.go | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/modules/queue/unique_queue_disk_channel_test.go b/modules/queue/unique_queue_disk_channel_test.go index 5e33defd373ce..d6856799a5afa 100644 --- a/modules/queue/unique_queue_disk_channel_test.go +++ b/modules/queue/unique_queue_disk_channel_test.go @@ -101,7 +101,7 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { executedInitial := map[string][]string{} hasInitial := map[string][]string{} - fillQueue := func(name string, done chan int) { + fillQueue := func(name string, done chan int64) { t.Run("Initial Filling: "+name, func(t *testing.T) { lock := sync.Mutex{} @@ -159,13 +159,14 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { mapLock.Unlock() }) mapLock.Lock() - done <- len(hasInitial[name]) + count := int64(len(hasInitial[name])) mapLock.Unlock() + done <- count close(done) } - hasQueueAChan := make(chan int) - hasQueueBChan := make(chan int) + hasQueueAChan := make(chan int64) + hasQueueBChan := make(chan int64) go fillQueue("QueueA", hasQueueAChan) go fillQueue("QueueB", hasQueueBChan) @@ -181,14 +182,14 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { stop := make(chan struct{}) // collect the tasks that have been executed + atomicCount := int64(0) handle := func(data ...Data) []Data { lock.Lock() - i := int64(0) for _, datum := range data { mapLock.Lock() executedEmpty[name] = append(executedEmpty[name], datum.(string)) mapLock.Unlock() - count := atomic.AddInt64(&i, 1) + count := atomic.AddInt64(&atomicCount, 1) if count >= numInQueue { close(stop) } @@ -226,8 +227,8 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { doneA := make(chan struct{}) doneB := make(chan struct{}) - go emptyQueue("QueueA", int64(hasA), doneA) - go emptyQueue("QueueB", int64(hasB), doneB) + go emptyQueue("QueueA", hasA, doneA) + go emptyQueue("QueueB", hasB, doneB) <-doneA <-doneB @@ -243,8 +244,8 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { hasEmpty = map[string][]string{} mapLock.Unlock() - hasQueueAChan = make(chan int) - hasQueueBChan = make(chan int) + hasQueueAChan = make(chan int64) + hasQueueBChan = make(chan int64) go fillQueue("QueueA", hasQueueAChan) go fillQueue("QueueB", hasQueueBChan) @@ -255,8 +256,8 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { doneA = make(chan struct{}) doneB = make(chan struct{}) - go emptyQueue("QueueA", int64(hasA), doneA) - go emptyQueue("QueueB", int64(hasB), doneB) + go emptyQueue("QueueA", hasA, doneA) + go emptyQueue("QueueB", hasB, doneB) <-doneA <-doneB From 5bf411926d99da48dde0259da24a62b5900d9940 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Wed, 3 May 2023 21:07:35 +0100 Subject: [PATCH 4/4] re-enable test Signed-off-by: Andrew Thornton --- modules/queue/unique_queue_disk_channel_test.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/modules/queue/unique_queue_disk_channel_test.go b/modules/queue/unique_queue_disk_channel_test.go index d6856799a5afa..11a1d4b88d2bd 100644 --- a/modules/queue/unique_queue_disk_channel_test.go +++ b/modules/queue/unique_queue_disk_channel_test.go @@ -4,7 +4,6 @@ package queue import ( - "os" "strconv" "sync" "sync/atomic" @@ -17,10 +16,7 @@ import ( ) func TestPersistableChannelUniqueQueue(t *testing.T) { - if os.Getenv("CI") != "" { - t.Skip("Skipping because test is flaky on CI") - } - + // Create a temporary directory for the queue tmpDir := t.TempDir() _ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)