forked from riverqueue/river
-
Notifications
You must be signed in to change notification settings - Fork 0
/
example_queue_pause_test.go
127 lines (105 loc) · 3.55 KB
/
example_queue_pause_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package river_test
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/internal/util/slogutil"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
)
type ReportingArgs struct{}
func (args ReportingArgs) Kind() string { return "Reporting" }
type ReportingWorker struct {
river.WorkerDefaults[ReportingArgs]
jobWorkedCh chan<- string
}
func (w *ReportingWorker) Work(ctx context.Context, job *river.Job[ReportingArgs]) error {
select {
case <-ctx.Done():
return ctx.Err()
case w.jobWorkedCh <- job.Queue:
return nil
}
}
// Example_queuePause demonstrates how to pause queues to prevent them from
// working new jobs, and later resume them.
func Example_queuePause() {
ctx := context.Background()
dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
if err != nil {
panic(err)
}
defer dbPool.Close()
// Required for the purpose of this test, but not necessary in real usage.
if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
panic(err)
}
const (
unreliableQueue = "unreliable_external_service"
reliableQueue = "reliable_jobs"
)
workers := river.NewWorkers()
jobWorkedCh := make(chan string)
river.AddWorker(workers, &ReportingWorker{jobWorkedCh: jobWorkedCh})
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
Queues: map[string]river.QueueConfig{
unreliableQueue: {MaxWorkers: 10},
reliableQueue: {MaxWorkers: 10},
},
Workers: workers,
})
if err != nil {
panic(err)
}
if err := riverClient.Start(ctx); err != nil {
panic(err)
}
// Out of example scope, but used to wait until a queue is paused or unpaused.
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindQueuePaused, river.EventKindQueueResumed)
defer subscribeCancel()
fmt.Printf("Pausing %s queue\n", unreliableQueue)
if err := riverClient.QueuePause(ctx, unreliableQueue, nil); err != nil {
panic(err)
}
// Wait for queue to be paused:
waitOrTimeout(subscribeChan)
fmt.Println("Inserting one job each into unreliable and reliable queues")
if _, err = riverClient.Insert(ctx, ReportingArgs{}, &river.InsertOpts{Queue: unreliableQueue}); err != nil {
panic(err)
}
if _, err = riverClient.Insert(ctx, ReportingArgs{}, &river.InsertOpts{Queue: reliableQueue}); err != nil {
panic(err)
}
// The unreliable queue is paused so its job should get worked yet, while
// reliable queue is not paused so its job should get worked immediately:
receivedQueue := waitOrTimeout(jobWorkedCh)
fmt.Printf("Job worked on %s queue\n", receivedQueue)
// Resume the unreliable queue so it can work the job:
fmt.Printf("Resuming %s queue\n", unreliableQueue)
if err := riverClient.QueueResume(ctx, unreliableQueue, nil); err != nil {
panic(err)
}
receivedQueue = waitOrTimeout(jobWorkedCh)
fmt.Printf("Job worked on %s queue\n", receivedQueue)
if err := riverClient.Stop(ctx); err != nil {
panic(err)
}
// Output:
// Pausing unreliable_external_service queue
// Inserting one job each into unreliable and reliable queues
// Job worked on reliable_jobs queue
// Resuming unreliable_external_service queue
// Job worked on unreliable_external_service queue
}
func waitOrTimeout[T any](ch <-chan T) T {
select {
case item := <-ch:
return item
case <-time.After(5 * time.Second):
panic("WaitOrTimeout timed out after waiting 5s")
}
}