From a610c89655c180d01bf4b5043a03ebb5008511f4 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 25 Feb 2023 22:42:16 +0000 Subject: [PATCH 1/9] Properly flush unique queues on startup There have been a number of reports of blocked PRs being checked which have been difficult to debug. In investigating #23050 I have realised that whilst the Warn there is somewhat of a miscall there was a real bug in the way that the LevelUniqueQueue was being restored on start-up of the PersistableChannelUniqueQueue. This PR fixes this bug and adds a testcase. Fix #23050 and others Signed-off-by: Andrew Thornton --- modules/queue/unique_queue_disk_channel.go | 24 +- .../queue/unique_queue_disk_channel_test.go | 223 ++++++++++++++++++ 2 files changed, 240 insertions(+), 7 deletions(-) create mode 100644 modules/queue/unique_queue_disk_channel_test.go diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index 405726182dcbc..7934342bea3f5 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -209,17 +209,27 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func()) atTerminate(q.Terminate) _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) - if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len(luq.shutdownCtx) != 0 { + if luq, ok := q.internal.(*LevelUniqueQueue); ok && !luq.IsEmpty() { // Just run the level queue - we shut it down once it's flushed - go q.internal.Run(func(_ func()) {}, func(_ func()) {}) + go luq.Run(func(_ func()) {}, func(_ func()) {}) go func() { - _ = q.internal.Flush(0) - log.Debug("LevelUniqueQueue: %s flushed so shutting down", q.internal.(*LevelUniqueQueue).Name()) - q.internal.(*LevelUniqueQueue).Shutdown() - GetManager().Remove(q.internal.(*LevelUniqueQueue).qid) + _ = luq.Flush(0) + for !luq.IsEmpty() { + _ = luq.Flush(0) + select { + case <-time.After(100 * time.Millisecond): + case <-luq.shutdownCtx.Done(): + log.Warn("LevelUniqueQueue: %s shut down before completely flushed", luq.Name()) + return + } + } + log.Debug("LevelUniqueQueue: %s flushed so shutting down", luq.Name()) + luq.Shutdown() + GetManager().Remove(luq.qid) }() } else { log.Debug("PersistableChannelUniqueQueue: %s Skipping running the empty level queue", q.delayedStarter.name) + _ = q.internal.Flush(0) q.internal.(*LevelUniqueQueue).Shutdown() GetManager().Remove(q.internal.(*LevelUniqueQueue).qid) } @@ -286,7 +296,7 @@ func (q *PersistableChannelUniqueQueue) Shutdown() { close(q.channelQueue.dataChan) log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name) for data := range q.channelQueue.dataChan { - _ = q.internal.Push(data) + _ = q.internal.(*LevelUniqueQueue).Push(data) } log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name) diff --git a/modules/queue/unique_queue_disk_channel_test.go b/modules/queue/unique_queue_disk_channel_test.go new file mode 100644 index 0000000000000..274b1a83ad4c8 --- /dev/null +++ b/modules/queue/unique_queue_disk_channel_test.go @@ -0,0 +1,223 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package queue + +import ( + "fmt" + "strconv" + "sync" + "testing" + "time" + + "code.gitea.io/gitea/modules/log" + "github.com/stretchr/testify/assert" +) + +func TestPersistableChannelUniqueQueue(t *testing.T) { + tmpDir := t.TempDir() + fmt.Printf("TempDir %s\n", tmpDir) + _ = log.NewLogger(1000, "console", "console", `{"level":"trace","stacktracelevel":"NONE","stderr":true}`) + + // Common function to create the Queue + newQueue := func(handle func(data ...Data) []Data) Queue { + q, err := NewPersistableChannelUniqueQueue(handle, + PersistableChannelUniqueQueueConfiguration{ + Name: "TestPersistableChannelUniqueQueue", + DataDir: tmpDir, + QueueLength: 200, + MaxWorkers: 1, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 1, + Workers: 0, + }, "task-0") + assert.NoError(t, err) + return q + } + + // runs the provided queue and provides some timer function + type channels struct { + readyForShutdown chan struct{} // closed when shutdown functions have been assigned + readyForTerminate chan struct{} // closed when terminate functions have been assigned + signalShutdown chan struct{} // Should close to signal shutdown + doneShutdown chan struct{} // closed when shutdown function is done + queueTerminate []func() // list of atTerminate functions to call atTerminate - need to be accessed with lock + } + runQueue := func(q Queue, lock *sync.Mutex) *channels { + returnable := &channels{ + readyForShutdown: make(chan struct{}), + readyForTerminate: make(chan struct{}), + signalShutdown: make(chan struct{}), + doneShutdown: make(chan struct{}), + } + go q.Run(func(atShutdown func()) { + go func() { + lock.Lock() + select { + case <-returnable.readyForShutdown: + default: + close(returnable.readyForShutdown) + } + lock.Unlock() + <-returnable.signalShutdown + atShutdown() + close(returnable.doneShutdown) + }() + }, func(atTerminate func()) { + lock.Lock() + defer lock.Unlock() + select { + case <-returnable.readyForTerminate: + default: + close(returnable.readyForTerminate) + } + returnable.queueTerminate = append(returnable.queueTerminate, atTerminate) + }) + + return returnable + } + + // call to shutdown and terminate the queue associated with the channels + shutdownAndTerminate := func(chans *channels, lock *sync.Mutex) { + close(chans.signalShutdown) + <-chans.doneShutdown + <-chans.readyForTerminate + + lock.Lock() + callbacks := []func(){} + callbacks = append(callbacks, chans.queueTerminate...) + lock.Unlock() + + for _, callback := range callbacks { + callback() + } + } + + executedTasks1 := []string{} + hasTasks1 := []string{} + + t.Run("Initial Filling", func(t *testing.T) { + lock := sync.Mutex{} + + startAt100Queued := make(chan struct{}) + stopAt20Shutdown := make(chan struct{}) // stop and shutdown at the 20th item + + handle := func(data ...Data) []Data { + <-startAt100Queued + for _, datum := range data { + s := datum.(string) + lock.Lock() + executedTasks1 = append(executedTasks1, s) + lock.Unlock() + if s == "task-20" { + close(stopAt20Shutdown) + } + } + return nil + } + + q := newQueue(handle) + + // add 100 tasks to the queue + for i := 0; i < 100; i++ { + _ = q.Push("task-" + strconv.Itoa(i)) + } + close(startAt100Queued) + + chans := runQueue(q, &lock) + + <-chans.readyForShutdown + <-stopAt20Shutdown + shutdownAndTerminate(chans, &lock) + + // check which tasks are still in the queue + for i := 0; i < 100; i++ { + if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has { + hasTasks1 = append(hasTasks1, "task-"+strconv.Itoa(i)) + } + } + assert.Equal(t, 100, len(executedTasks1)+len(hasTasks1)) + }) + + executedTasks2 := []string{} + hasTasks2 := []string{} + t.Run("Ensure that things will empty on restart", func(t *testing.T) { + lock := sync.Mutex{} + stop := make(chan struct{}) + + // collect the tasks that have been executed + handle := func(data ...Data) []Data { + lock.Lock() + for _, datum := range data { + t.Logf("executed %s", datum.(string)) + executedTasks2 = append(executedTasks2, datum.(string)) + if datum.(string) == "task-99" { + close(stop) + } + } + lock.Unlock() + return nil + } + + q := newQueue(handle) + chans := runQueue(q, &lock) + + <-chans.readyForShutdown + <-stop + shutdownAndTerminate(chans, &lock) + + // check which tasks are still in the queue + for i := 0; i < 100; i++ { + if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has { + hasTasks2 = append(hasTasks2, "task-"+strconv.Itoa(i)) + } + } + + assert.Equal(t, 100, len(executedTasks1)+len(executedTasks2)) + assert.Equal(t, 0, len(hasTasks2)) + }) + + executedTasks3 := []string{} + hasTasks3 := []string{} + + t.Run("refill", func(t *testing.T) { + lock := sync.Mutex{} + stop := make(chan struct{}) + + handle := func(data ...Data) []Data { + lock.Lock() + for _, datum := range data { + executedTasks3 = append(executedTasks3, datum.(string)) + } + lock.Unlock() + return nil + } + + q := newQueue(handle) + chans := runQueue(q, &lock) + + // re-run all tasks + for i := 0; i < 100; i++ { + _ = q.Push("task-" + strconv.Itoa(i)) + } + + // wait for a while + time.Sleep(1 * time.Second) + + close(stop) + <-chans.readyForShutdown + shutdownAndTerminate(chans, &lock) + + // check whether the tasks are still in the queue + for i := 0; i < 100; i++ { + if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has { + hasTasks3 = append(hasTasks3, "task-"+strconv.Itoa(i)) + } + } + assert.Equal(t, 100, len(executedTasks3)+len(hasTasks3)) + }) + + t.Logf("TestPersistableChannelUniqueQueue completed1=%v, executed2=%v, has2=%v, executed3=%v, has3=%v", + len(executedTasks1), len(executedTasks2), len(hasTasks2), len(executedTasks3), len(hasTasks3)) +} From 9e5ea4c6b3a19a50143262759ece5ba984b15b5d Mon Sep 17 00:00:00 2001 From: zeripath Date: Sat, 25 Feb 2023 23:05:40 +0000 Subject: [PATCH 2/9] Update modules/queue/unique_queue_disk_channel_test.go --- modules/queue/unique_queue_disk_channel_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/queue/unique_queue_disk_channel_test.go b/modules/queue/unique_queue_disk_channel_test.go index 274b1a83ad4c8..a1e9e4808ad4e 100644 --- a/modules/queue/unique_queue_disk_channel_test.go +++ b/modules/queue/unique_queue_disk_channel_test.go @@ -11,6 +11,7 @@ import ( "time" "code.gitea.io/gitea/modules/log" + "github.com/stretchr/testify/assert" ) From 7c99ea6e10408591fd4d6cba340906500c26f2f2 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sun, 26 Feb 2023 09:27:13 +0000 Subject: [PATCH 3/9] And set the internal queue name for the queues Thanks to brechtvl for noticing this issue! Signed-off-by: Andrew Thornton --- modules/queue/queue_disk_channel.go | 3 ++- modules/queue/unique_queue_disk_channel.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index c7526714c65cb..16d805695f221 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -94,7 +94,8 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( }, Workers: 0, }, - DataDir: config.DataDir, + DataDir: config.DataDir, + QueueName: config.Name + "-level", } levelQueue, err := NewLevelQueue(wrappedHandle, levelCfg, exemplar) diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index 7934342bea3f5..bc2f3014a438a 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -94,7 +94,8 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac }, Workers: 0, }, - DataDir: config.DataDir, + DataDir: config.DataDir, + QueueName: config.Name + "-level", } queue.channelQueue = channelUniqueQueue.(*ChannelUniqueQueue) From b8fa5cb7dce2bba6556f7d06b656e390402104c6 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sun, 26 Feb 2023 10:43:37 +0000 Subject: [PATCH 4/9] Improve test to include some interleaving of queues Signed-off-by: Andrew Thornton --- .../queue/unique_queue_disk_channel_test.go | 211 +++++++++--------- 1 file changed, 111 insertions(+), 100 deletions(-) diff --git a/modules/queue/unique_queue_disk_channel_test.go b/modules/queue/unique_queue_disk_channel_test.go index a1e9e4808ad4e..151312b174d3b 100644 --- a/modules/queue/unique_queue_disk_channel_test.go +++ b/modules/queue/unique_queue_disk_channel_test.go @@ -21,10 +21,10 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { _ = log.NewLogger(1000, "console", "console", `{"level":"trace","stacktracelevel":"NONE","stderr":true}`) // Common function to create the Queue - newQueue := func(handle func(data ...Data) []Data) Queue { + newQueue := func(name string, handle func(data ...Data) []Data) Queue { q, err := NewPersistableChannelUniqueQueue(handle, PersistableChannelUniqueQueueConfiguration{ - Name: "TestPersistableChannelUniqueQueue", + Name: name, DataDir: tmpDir, QueueLength: 200, MaxWorkers: 1, @@ -95,130 +95,141 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { } } - executedTasks1 := []string{} - hasTasks1 := []string{} + executedInitial := map[string][]string{} + hasInitial := map[string][]string{} + + fillQueue := func(name string, done chan struct{}) { + t.Run("Initial Filling: "+name, func(t *testing.T) { + lock := sync.Mutex{} + + startAt100Queued := make(chan struct{}) + stopAt20Shutdown := make(chan struct{}) // stop and shutdown at the 20th item + + handle := func(data ...Data) []Data { + <-startAt100Queued + for _, datum := range data { + s := datum.(string) + lock.Lock() + executedInitial[name] = append(executedInitial[name], s) + lock.Unlock() + if s == "task-20" { + close(stopAt20Shutdown) + } + } + return nil + } - t.Run("Initial Filling", func(t *testing.T) { - lock := sync.Mutex{} + q := newQueue(name, handle) - startAt100Queued := make(chan struct{}) - stopAt20Shutdown := make(chan struct{}) // stop and shutdown at the 20th item + // add 100 tasks to the queue + for i := 0; i < 100; i++ { + _ = q.Push("task-" + strconv.Itoa(i)) + } + close(startAt100Queued) - handle := func(data ...Data) []Data { - <-startAt100Queued - for _, datum := range data { - s := datum.(string) - lock.Lock() - executedTasks1 = append(executedTasks1, s) - lock.Unlock() - if s == "task-20" { - close(stopAt20Shutdown) + chans := runQueue(q, &lock) + + <-chans.readyForShutdown + <-stopAt20Shutdown + shutdownAndTerminate(chans, &lock) + + // check which tasks are still in the queue + for i := 0; i < 100; i++ { + if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has { + hasInitial[name] = append(hasInitial[name], "task-"+strconv.Itoa(i)) } } - return nil - } + assert.Equal(t, 100, len(executedInitial[name])+len(hasInitial[name])) + }) + close(done) + } - q := newQueue(handle) + doneA := make(chan struct{}) + doneB := make(chan struct{}) - // add 100 tasks to the queue - for i := 0; i < 100; i++ { - _ = q.Push("task-" + strconv.Itoa(i)) - } - close(startAt100Queued) + go fillQueue("QueueA", doneA) + go fillQueue("QueueB", doneB) - chans := runQueue(q, &lock) + <-doneA + <-doneB - <-chans.readyForShutdown - <-stopAt20Shutdown - shutdownAndTerminate(chans, &lock) + executedEmpty := map[string][]string{} + hasEmpty := map[string][]string{} + emptyQueue := func(name string, done chan struct{}) { + t.Run("Empty Queue: "+name, func(t *testing.T) { + lock := sync.Mutex{} + stop := make(chan struct{}) - // check which tasks are still in the queue - for i := 0; i < 100; i++ { - if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has { - hasTasks1 = append(hasTasks1, "task-"+strconv.Itoa(i)) + // collect the tasks that have been executed + handle := func(data ...Data) []Data { + lock.Lock() + for _, datum := range data { + t.Logf("executed %s", datum.(string)) + executedEmpty[name] = append(executedEmpty[name], datum.(string)) + if datum.(string) == "task-99" { + close(stop) + } + } + lock.Unlock() + return nil } - } - assert.Equal(t, 100, len(executedTasks1)+len(hasTasks1)) - }) - executedTasks2 := []string{} - hasTasks2 := []string{} - t.Run("Ensure that things will empty on restart", func(t *testing.T) { - lock := sync.Mutex{} - stop := make(chan struct{}) + q := newQueue(name, handle) + chans := runQueue(q, &lock) - // collect the tasks that have been executed - handle := func(data ...Data) []Data { - lock.Lock() - for _, datum := range data { - t.Logf("executed %s", datum.(string)) - executedTasks2 = append(executedTasks2, datum.(string)) - if datum.(string) == "task-99" { - close(stop) + <-chans.readyForShutdown + <-stop + shutdownAndTerminate(chans, &lock) + + // check which tasks are still in the queue + for i := 0; i < 100; i++ { + if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has { + hasEmpty[name] = append(hasEmpty[name], "task-"+strconv.Itoa(i)) } } - lock.Unlock() - return nil - } - q := newQueue(handle) - chans := runQueue(q, &lock) + assert.Equal(t, 100, len(executedInitial[name])+len(executedEmpty[name])) + assert.Equal(t, 0, len(hasEmpty[name])) + }) + close(done) + } - <-chans.readyForShutdown - <-stop - shutdownAndTerminate(chans, &lock) + doneA = make(chan struct{}) + doneB = make(chan struct{}) - // check which tasks are still in the queue - for i := 0; i < 100; i++ { - if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has { - hasTasks2 = append(hasTasks2, "task-"+strconv.Itoa(i)) - } - } + go emptyQueue("QueueA", doneA) + go emptyQueue("QueueB", doneB) - assert.Equal(t, 100, len(executedTasks1)+len(executedTasks2)) - assert.Equal(t, 0, len(hasTasks2)) - }) + <-doneA + <-doneB - executedTasks3 := []string{} - hasTasks3 := []string{} + t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v", + len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"])) - t.Run("refill", func(t *testing.T) { - lock := sync.Mutex{} - stop := make(chan struct{}) + // reset and rerun + executedInitial = map[string][]string{} + hasInitial = map[string][]string{} + executedEmpty = map[string][]string{} + hasEmpty = map[string][]string{} - handle := func(data ...Data) []Data { - lock.Lock() - for _, datum := range data { - executedTasks3 = append(executedTasks3, datum.(string)) - } - lock.Unlock() - return nil - } + doneA = make(chan struct{}) + doneB = make(chan struct{}) - q := newQueue(handle) - chans := runQueue(q, &lock) + go fillQueue("QueueA", doneA) + go fillQueue("QueueB", doneB) - // re-run all tasks - for i := 0; i < 100; i++ { - _ = q.Push("task-" + strconv.Itoa(i)) - } + <-doneA + <-doneB - // wait for a while - time.Sleep(1 * time.Second) + doneA = make(chan struct{}) + doneB = make(chan struct{}) - close(stop) - <-chans.readyForShutdown - shutdownAndTerminate(chans, &lock) + go emptyQueue("QueueA", doneA) + go emptyQueue("QueueB", doneB) - // check whether the tasks are still in the queue - for i := 0; i < 100; i++ { - if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has { - hasTasks3 = append(hasTasks3, "task-"+strconv.Itoa(i)) - } - } - assert.Equal(t, 100, len(executedTasks3)+len(hasTasks3)) - }) + <-doneA + <-doneB - t.Logf("TestPersistableChannelUniqueQueue completed1=%v, executed2=%v, has2=%v, executed3=%v, has3=%v", - len(executedTasks1), len(executedTasks2), len(hasTasks2), len(executedTasks3), len(hasTasks3)) + t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v", + len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"])) } From 28544488e603f6293dc2d8f282443eb7114a71bf Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sun, 26 Feb 2023 10:50:25 +0000 Subject: [PATCH 5/9] Queues need to be named the same - fix TestPersistsableChannelQueue Signed-off-by: Andrew Thornton --- modules/queue/queue_disk_channel_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index 318610355e433..4f14a5d79df92 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -39,7 +39,7 @@ func TestPersistableChannelQueue(t *testing.T) { Workers: 1, BoostWorkers: 0, MaxWorkers: 10, - Name: "first", + Name: "test-queue", }, &testData{}) assert.NoError(t, err) @@ -135,7 +135,7 @@ func TestPersistableChannelQueue(t *testing.T) { Workers: 1, BoostWorkers: 0, MaxWorkers: 10, - Name: "second", + Name: "test-queue", }, &testData{}) assert.NoError(t, err) @@ -227,7 +227,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { Workers: 1, BoostWorkers: 0, MaxWorkers: 10, - Name: "first", + Name: "test-queue", }, &testData{}) assert.NoError(t, err) @@ -433,7 +433,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { Workers: 1, BoostWorkers: 0, MaxWorkers: 10, - Name: "second", + Name: "test-queue", }, &testData{}) assert.NoError(t, err) pausable, ok = queue.(Pausable) From a233641d37e9abb76bea4eef23ee10ee1a343fa7 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sun, 26 Feb 2023 11:15:15 +0000 Subject: [PATCH 6/9] adjust logging slightly to remove some more spurious logs Signed-off-by: Andrew Thornton --- modules/queue/queue_disk_channel.go | 26 +++++++++++++++++----- modules/queue/unique_queue_disk_channel.go | 18 +++++++++++++-- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 16d805695f221..91f91f0dfc80d 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -173,16 +173,18 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) { atShutdown(q.Shutdown) atTerminate(q.Terminate) - if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.shutdownCtx) != 0 { + if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.terminateCtx) != 0 { // Just run the level queue - we shut it down once it's flushed go q.internal.Run(func(_ func()) {}, func(_ func()) {}) go func() { - for !q.IsEmpty() { - _ = q.internal.Flush(0) + for !lq.IsEmpty() { + _ = lq.Flush(0) select { case <-time.After(100 * time.Millisecond): - case <-q.internal.(*LevelQueue).shutdownCtx.Done(): - log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name()) + case <-lq.shutdownCtx.Done(): + if lq.byteFIFO.Len(lq.terminateCtx) > 0 { + log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name()) + } return } } @@ -317,10 +319,22 @@ func (q *PersistableChannelQueue) Shutdown() { // Redirect all remaining data in the chan to the internal channel log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) close(q.channelQueue.dataChan) + countOK, countLost := 0, 0 for data := range q.channelQueue.dataChan { - _ = q.internal.Push(data) + err := q.internal.Push(data) + if err != nil { + log.Error("PersistableChannelQueue: %s Unable redirect %v due to: %v", q.delayedStarter.name, data, err) + countLost++ + } else { + countOK++ + } atomic.AddInt64(&q.channelQueue.numInQueue, -1) } + if countLost > 0 { + log.Warn("PersistableChannelQueue: %s %d will be restored on restart, %d lost", q.delayedStarter.name, countOK, countLost) + } else if countOK > 0 { + log.Warn("PersistableChannelQueue: %s %d will be restored on restart", q.delayedStarter.name, countOK) + } log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name) log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name) diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index bc2f3014a438a..cc8a807c67237 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -220,7 +220,9 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func()) select { case <-time.After(100 * time.Millisecond): case <-luq.shutdownCtx.Done(): - log.Warn("LevelUniqueQueue: %s shut down before completely flushed", luq.Name()) + if luq.byteFIFO.Len(luq.terminateCtx) > 0 { + log.Warn("LevelUniqueQueue: %s shut down before completely flushed", luq.Name()) + } return } } @@ -296,8 +298,20 @@ func (q *PersistableChannelUniqueQueue) Shutdown() { // Redirect all remaining data in the chan to the internal channel close(q.channelQueue.dataChan) log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name) + countOK, countLost := 0, 0 for data := range q.channelQueue.dataChan { - _ = q.internal.(*LevelUniqueQueue).Push(data) + err := q.internal.(*LevelUniqueQueue).Push(data) + if err != nil { + log.Error("PersistableChannelUniqueQueue: %s Unable redirect %v due to: %v", q.delayedStarter.name, data, err) + countLost++ + } else { + countOK++ + } + } + if countLost > 0 { + log.Warn("PersistableChannelUniqueQueue: %s %d will be restored on restart, %d lost", q.delayedStarter.name, countOK, countLost) + } else if countOK > 0 { + log.Warn("PersistableChannelUniqueQueue: %s %d will be restored on restart", q.delayedStarter.name, countOK) } log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name) From 7396bf79621acfb262ce3f715f980994c46b9fc7 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sun, 26 Feb 2023 11:38:21 +0000 Subject: [PATCH 7/9] try to avoid warning and lock issues Signed-off-by: Andrew Thornton --- modules/queue/queue_channel.go | 5 ++++- .../queue/unique_queue_disk_channel_test.go | 22 +++++++++++++++---- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index 6f75b8357eabc..baac097393956 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -124,7 +124,10 @@ func (q *ChannelQueue) Shutdown() { log.Trace("ChannelQueue: %s Flushing", q.name) // We can't use Cleanup here because that will close the channel if err := q.FlushWithContext(q.terminateCtx); err != nil { - log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name) + count := atomic.LoadInt64(&q.numInQueue) + if count > 0 { + log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name) + } return } log.Debug("ChannelQueue: %s Flushed", q.name) diff --git a/modules/queue/unique_queue_disk_channel_test.go b/modules/queue/unique_queue_disk_channel_test.go index 151312b174d3b..858594465e9d2 100644 --- a/modules/queue/unique_queue_disk_channel_test.go +++ b/modules/queue/unique_queue_disk_channel_test.go @@ -18,7 +18,7 @@ import ( func TestPersistableChannelUniqueQueue(t *testing.T) { tmpDir := t.TempDir() fmt.Printf("TempDir %s\n", tmpDir) - _ = log.NewLogger(1000, "console", "console", `{"level":"trace","stacktracelevel":"NONE","stderr":true}`) + _ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`) // Common function to create the Queue newQueue := func(name string, handle func(data ...Data) []Data) Queue { @@ -95,6 +95,7 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { } } + mapLock := sync.Mutex{} executedInitial := map[string][]string{} hasInitial := map[string][]string{} @@ -109,9 +110,9 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { <-startAt100Queued for _, datum := range data { s := datum.(string) - lock.Lock() + mapLock.Lock() executedInitial[name] = append(executedInitial[name], s) - lock.Unlock() + mapLock.Unlock() if s == "task-20" { close(stopAt20Shutdown) } @@ -136,10 +137,14 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { // check which tasks are still in the queue for i := 0; i < 100; i++ { if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has { + mapLock.Lock() hasInitial[name] = append(hasInitial[name], "task-"+strconv.Itoa(i)) + mapLock.Unlock() } } + mapLock.Lock() assert.Equal(t, 100, len(executedInitial[name])+len(hasInitial[name])) + mapLock.Unlock() }) close(done) } @@ -164,8 +169,9 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { handle := func(data ...Data) []Data { lock.Lock() for _, datum := range data { - t.Logf("executed %s", datum.(string)) + mapLock.Lock() executedEmpty[name] = append(executedEmpty[name], datum.(string)) + mapLock.Unlock() if datum.(string) == "task-99" { close(stop) } @@ -184,12 +190,16 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { // check which tasks are still in the queue for i := 0; i < 100; i++ { if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has { + mapLock.Lock() hasEmpty[name] = append(hasEmpty[name], "task-"+strconv.Itoa(i)) + mapLock.Unlock() } } + mapLock.Lock() assert.Equal(t, 100, len(executedInitial[name])+len(executedEmpty[name])) assert.Equal(t, 0, len(hasEmpty[name])) + mapLock.Unlock() }) close(done) } @@ -203,6 +213,7 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { <-doneA <-doneB + mapLock.Lock() t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v", len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"])) @@ -211,6 +222,7 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { hasInitial = map[string][]string{} executedEmpty = map[string][]string{} hasEmpty = map[string][]string{} + mapLock.Unlock() doneA = make(chan struct{}) doneB = make(chan struct{}) @@ -230,6 +242,8 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { <-doneA <-doneB + mapLock.Lock() t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v", len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"])) + mapLock.Unlock() } From efe654de08c8fc5c08745f5ec38954b7e8c23421 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sun, 26 Feb 2023 11:42:30 +0000 Subject: [PATCH 8/9] avoid logging the channel early shutdown message if the q is not empty Signed-off-by: Andrew Thornton --- modules/queue/unique_queue_channel.go | 4 +++- modules/queue/unique_queue_channel_test.go | 7 +++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index c43bd1db3f7da..62c051aa3935e 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -177,7 +177,9 @@ func (q *ChannelUniqueQueue) Shutdown() { go func() { log.Trace("ChannelUniqueQueue: %s Flushing", q.name) if err := q.FlushWithContext(q.terminateCtx); err != nil { - log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name) + if !q.IsEmpty() { + log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name) + } return } log.Debug("ChannelUniqueQueue: %s Flushed", q.name) diff --git a/modules/queue/unique_queue_channel_test.go b/modules/queue/unique_queue_channel_test.go index 9372694b87a6d..824015b834fee 100644 --- a/modules/queue/unique_queue_channel_test.go +++ b/modules/queue/unique_queue_channel_test.go @@ -8,10 +8,13 @@ import ( "testing" "time" + "code.gitea.io/gitea/modules/log" + "github.com/stretchr/testify/assert" ) func TestChannelUniqueQueue(t *testing.T) { + _ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`) handleChan := make(chan *testData) handle := func(data ...Data) []Data { for _, datum := range data { @@ -52,6 +55,8 @@ func TestChannelUniqueQueue(t *testing.T) { } func TestChannelUniqueQueue_Batch(t *testing.T) { + _ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`) + handleChan := make(chan *testData) handle := func(data ...Data) []Data { for _, datum := range data { @@ -98,6 +103,8 @@ func TestChannelUniqueQueue_Batch(t *testing.T) { } func TestChannelUniqueQueue_Pause(t *testing.T) { + _ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`) + lock := sync.Mutex{} var queue Queue var err error From 8cbcff2f968794aca48a709f391cdc52f6ee7cf6 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sun, 26 Feb 2023 21:27:21 +0000 Subject: [PATCH 9/9] fix testcase Signed-off-by: Andrew Thornton --- .../queue/unique_queue_disk_channel_test.go | 44 ++++++++++++------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/modules/queue/unique_queue_disk_channel_test.go b/modules/queue/unique_queue_disk_channel_test.go index 858594465e9d2..fd76163f4aaca 100644 --- a/modules/queue/unique_queue_disk_channel_test.go +++ b/modules/queue/unique_queue_disk_channel_test.go @@ -46,7 +46,7 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { queueTerminate []func() // list of atTerminate functions to call atTerminate - need to be accessed with lock } runQueue := func(q Queue, lock *sync.Mutex) *channels { - returnable := &channels{ + chans := &channels{ readyForShutdown: make(chan struct{}), readyForTerminate: make(chan struct{}), signalShutdown: make(chan struct{}), @@ -56,33 +56,31 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { go func() { lock.Lock() select { - case <-returnable.readyForShutdown: + case <-chans.readyForShutdown: default: - close(returnable.readyForShutdown) + close(chans.readyForShutdown) } lock.Unlock() - <-returnable.signalShutdown + <-chans.signalShutdown atShutdown() - close(returnable.doneShutdown) + close(chans.doneShutdown) }() }, func(atTerminate func()) { lock.Lock() defer lock.Unlock() select { - case <-returnable.readyForTerminate: + case <-chans.readyForTerminate: default: - close(returnable.readyForTerminate) + close(chans.readyForTerminate) } - returnable.queueTerminate = append(returnable.queueTerminate, atTerminate) + chans.queueTerminate = append(chans.queueTerminate, atTerminate) }) - return returnable + return chans } // call to shutdown and terminate the queue associated with the channels - shutdownAndTerminate := func(chans *channels, lock *sync.Mutex) { - close(chans.signalShutdown) - <-chans.doneShutdown + doTerminate := func(chans *channels, lock *sync.Mutex) { <-chans.readyForTerminate lock.Lock() @@ -132,7 +130,9 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { <-chans.readyForShutdown <-stopAt20Shutdown - shutdownAndTerminate(chans, &lock) + close(chans.signalShutdown) + <-chans.doneShutdown + _ = q.Push("final") // check which tasks are still in the queue for i := 0; i < 100; i++ { @@ -142,8 +142,16 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { mapLock.Unlock() } } + if has, _ := q.(UniqueQueue).Has("final"); has { + mapLock.Lock() + hasInitial[name] = append(hasInitial[name], "final") + mapLock.Unlock() + } else { + assert.Fail(t, "UnqueQueue %s should have \"final\"", name) + } + doTerminate(chans, &lock) mapLock.Lock() - assert.Equal(t, 100, len(executedInitial[name])+len(hasInitial[name])) + assert.Equal(t, 101, len(executedInitial[name])+len(hasInitial[name])) mapLock.Unlock() }) close(done) @@ -172,7 +180,7 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { mapLock.Lock() executedEmpty[name] = append(executedEmpty[name], datum.(string)) mapLock.Unlock() - if datum.(string) == "task-99" { + if datum.(string) == "final" { close(stop) } } @@ -185,7 +193,8 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { <-chans.readyForShutdown <-stop - shutdownAndTerminate(chans, &lock) + close(chans.signalShutdown) + <-chans.doneShutdown // check which tasks are still in the queue for i := 0; i < 100; i++ { @@ -195,9 +204,10 @@ func TestPersistableChannelUniqueQueue(t *testing.T) { mapLock.Unlock() } } + doTerminate(chans, &lock) mapLock.Lock() - assert.Equal(t, 100, len(executedInitial[name])+len(executedEmpty[name])) + assert.Equal(t, 101, len(executedInitial[name])+len(executedEmpty[name])) assert.Equal(t, 0, len(hasEmpty[name])) mapLock.Unlock() })