forked from riverqueue/river
-
Notifications
You must be signed in to change notification settings - Fork 0
/
job_complete_tx_test.go
73 lines (54 loc) · 1.75 KB
/
job_complete_tx_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package river
import (
"context"
"testing"
"time"
"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/require"
"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/internal/riverinternaltest/testfactory"
"github.com/riverqueue/river/internal/util/ptrutil"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivertype"
)
func TestJobCompleteTx(t *testing.T) {
t.Parallel()
ctx := context.Background()
type JobArgs struct {
JobArgsReflectKind[JobArgs]
}
type testBundle struct {
exec riverdriver.Executor
tx pgx.Tx
}
setup := func(t *testing.T) *testBundle {
t.Helper()
tx := riverinternaltest.TestTx(ctx, t)
return &testBundle{
exec: riverpgxv5.New(nil).UnwrapExecutor(tx),
tx: tx,
}
}
t.Run("CompletesJob", func(t *testing.T) {
t.Parallel()
bundle := setup(t)
job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{
State: ptrutil.Ptr(rivertype.JobStateRunning),
})
completedJob, err := JobCompleteTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job})
require.NoError(t, err)
require.Equal(t, rivertype.JobStateCompleted, completedJob.State)
require.WithinDuration(t, time.Now(), *completedJob.FinalizedAt, 2*time.Second)
updatedJob, err := bundle.exec.JobGetByID(ctx, job.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateCompleted, updatedJob.State)
})
t.Run("ErrorIfNotRunning", func(t *testing.T) {
t.Parallel()
bundle := setup(t)
job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{})
_, err := JobCompleteTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job})
require.EqualError(t, err, "job must be running")
})
}