-
Notifications
You must be signed in to change notification settings - Fork 0
/
dlq.go
116 lines (92 loc) · 2.44 KB
/
dlq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package wkafka
import (
"context"
"errors"
"fmt"
"github.com/twmb/franz-go/pkg/kgo"
)
type dlqProcess[T any] struct {
customer *customer[T]
isRevokedRecord func(r *kgo.Record) bool
setDLQRecord func(*kgo.Record)
processDLQ func(ctx context.Context, msg T) error
}
func newDLQProcess[T any](
c *customer[T],
isRevokedRecord func(r *kgo.Record) bool,
dlqRecord func(*kgo.Record),
processDLQ func(ctx context.Context, msg T) error,
) *dlqProcess[T] {
return &dlqProcess[T]{
customer: c,
isRevokedRecord: isRevokedRecord,
setDLQRecord: dlqRecord,
processDLQ: processDLQ,
}
}
func (d *dlqProcess[T]) iterationRecordDLQ(ctx context.Context, r *kgo.Record) error {
if d.customer.Skip(d.customer.Cfg, r) {
d.customer.Logger.Info("record skipped", "topic", r.Topic, "partition", r.Partition, "offset", r.Offset)
return nil
}
if d.customer.PreCheck != nil {
if err := d.customer.PreCheck(ctx, r); err != nil {
if errors.Is(err, ErrSkip) {
return nil
}
return fmt.Errorf("pre check failed: %w", err)
}
}
data, err := d.customer.Decode(r.Value, r)
if err != nil {
if errors.Is(err, ErrSkip) {
return nil
}
return fmt.Errorf("decode record failed: %w", err)
}
ctxCallback := context.WithValue(ctx, KeyRecord, r)
ctxCallback = context.WithValue(ctxCallback, KeyIsDLQProcess, true)
if err := d.processDLQ(ctxCallback, data); err != nil {
return err
}
return nil
}
// Iteration is used to listen DLQ topics, error usually comes from context cancellation.
// Any kind of error will be retry with interval.
func (d *dlqProcess[T]) Iteration(ctx context.Context, r *kgo.Record) error {
wait := newWaitRetry(d.customer.Cfg.DLQ.RetryInterval, d.customer.Cfg.DLQ.RetryMaxInterval)
firstIteration := true
defer func() {
d.setDLQRecord(nil)
}()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if d.isRevokedRecord(r) {
return errPartitionRevoked
}
if err := d.iterationRecordDLQ(ctx, r); err != nil {
errOrg, ok := IsDQLError(err)
var errWrapped error
if ok {
errWrapped = wrapErr(r, errOrg.Err, true)
} else {
errWrapped = wrapErr(r, err, true)
}
d.customer.Logger.Error("DLQ process failed", "error", errWrapped, "retry_interval", wait.CurrentInterval().String())
if firstIteration {
d.setDLQRecord(r)
firstIteration = false
}
if err := wait.Sleep(ctx); err != nil {
return err
}
continue
}
break
}
return nil
}