-
Notifications
You must be signed in to change notification settings - Fork 2
/
queue_message.go
135 lines (129 loc) · 3.3 KB
/
queue_message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package sq
import (
"context"
"database/sql"
xerr "github.com/goclub/error"
"time"
)
type Message struct {
QueueName string
ID uint64 `db:"id" sq:"ignoreInsert"`
BusinessID uint64 `db:"business_id"`
NextConsumeTime time.Time `db:"next_consume_time"`
ConsumeChance uint16 `db:"consume_chance"`
MaxConsumeChance uint16 `db:"max_consume_chance"`
UpdateID string `db:"update_id"`
Priority uint8 `db:"priority"`
CreateTime time.Time `db:"create_time"`
consume Consume
DefaultLifeCycle
WithoutSoftDelete
}
func (message *Message) TableName() string {
return "queue_" + message.QueueName
}
func (v *Message) AfterInsert(result Result) error {
id, err := result.LastInsertUint64Id()
if err != nil {
return err
}
v.ID = uint64(id)
return nil
}
type MessageResult struct {
ack bool
requeue bool
requeueDelay time.Duration
deadLetter bool
deadLetterReason string
err error
}
func (v MessageResult) WithError(err error) MessageResult {
if err != nil {
if v.err == nil {
v.err = err
} else {
v.err = xerr.WrapPrefix(err.Error(), err)
}
}
return v
}
func (Message) Ack() MessageResult {
return MessageResult{
ack: true,
}
}
func (Message) RequeueDelay(duration time.Duration, err error) MessageResult {
return MessageResult{
requeue: true,
requeueDelay: duration,
err: err,
}
}
func (Message) Requeue(err error) MessageResult {
return MessageResult{
requeue: true,
err: err,
}
}
func (Message) DeadLetter(reason string, err error) MessageResult {
return MessageResult{
deadLetter: true,
deadLetterReason: reason,
err: err,
}
}
func (message Message) execAck(db *Database) (err error) {
ctx := context.Background()
if err = db.HardDelete(ctx, &message, QB{
Where: And("id", Equal(message.ID)),
Limit: 1,
}); err != nil {
return
}
return
}
func (message Message) execRequeue(db *Database, delay time.Duration) (err error) {
ctx := context.Background()
if message.ConsumeChance == message.MaxConsumeChance {
return message.execDeadLetter(db, "MAX_CONSUME_CHANCE")
}
nextConsumeDuration := delay
if nextConsumeDuration == 0 {
nextConsumeDuration = message.consume.NextConsumeTime(message.ConsumeChance, message.MaxConsumeChance)
}
if err = db.Update(ctx, &message, QB{
Where: And("id", Equal(message.ID)),
Set: Set("next_consume_time", time.Now().In(message.consume.queueTimeLocation).Add(nextConsumeDuration)),
Limit: 1,
}); err != nil {
return
}
return
}
func (message Message) execDeadLetter(db *Database, reason string) (err error) {
ctx := context.Background()
var rollbackNoError bool
if rollbackNoError, err = db.Begin(ctx, sql.LevelReadCommitted, func(tx *T) TxResult {
if err = tx.HardDelete(ctx, &message, QB{
Where: And("id", Equal(message.ID)),
Limit: 1,
}); err != nil { // indivisible end
return tx.RollbackWithError(err)
}
if err = tx.InsertModel(ctx, &DeadLetterQueueMessage{
QueueName: message.QueueName,
BusinessID: message.BusinessID,
Reason: reason,
}, QB{}); err != nil { // indivisible end
return tx.RollbackWithError(err)
}
return tx.Commit()
}); err != nil {
return
}
if rollbackNoError {
return xerr.New("unexpected rollbackNoError")
}
return
}