-
Notifications
You must be signed in to change notification settings - Fork 2
/
queue.go
206 lines (200 loc) · 6.78 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package sq
import (
"context"
xerr "github.com/goclub/error"
"strconv"
"time"
)
type Publish struct {
BusinessID uint64
NextConsumeTime time.Duration
MaxConsumeChance uint16
Priority uint8 `default:"100"`
}
func corePublishMessage(ctx context.Context, queueTimeLocation *time.Location, s interface {
InsertModel(ctx context.Context, ptr Model, qb QB) (err error)
}, queueName string, publish Publish) (message Message, err error) {
if queueName == "" {
err = xerr.New("goclub/sql: PublishMessage(ctx, queueName, publish) queue can not be empty string")
return
}
if publish.Priority == 0 {
publish.Priority = 100
}
message = Message{
QueueName: queueName,
BusinessID: publish.BusinessID,
Priority: publish.Priority,
NextConsumeTime: time.Now().In(queueTimeLocation).Add(publish.NextConsumeTime),
ConsumeChance: 0,
MaxConsumeChance: publish.MaxConsumeChance,
UpdateID: "",
}
err = s.InsertModel(ctx, &message, QB{})
if err != nil {
return
}
return
}
func (db *Database) PublishMessage(ctx context.Context, queueName string, publish Publish) (message Message, err error) {
return corePublishMessage(ctx, db.QueueTimeLocation, db, queueName, publish)
}
func (tx *T) PublishMessage(ctx context.Context, queueName string, publish Publish) (message Message, err error) {
return corePublishMessage(ctx, tx.db.QueueTimeLocation, tx, queueName, publish)
}
type Consume struct {
QueueName string
HandleError func(err error)
HandleMessage func(message Message) MessageResult
NextConsumeTime func(consumeChance uint16, maxConsumeChance uint16) time.Duration
queueTimeLocation *time.Location
}
func (data *Consume) initAndCheck(db *Database) (err error) {
data.queueTimeLocation = db.QueueTimeLocation
if data.NextConsumeTime == nil {
data.NextConsumeTime = func(consumeChance uint16, maxConsumeChance uint16) time.Duration {
return time.Minute
}
}
if data.HandleMessage == nil {
return xerr.New("goclub/sql: Database{}.ConsumeMessage(ctx, consume) consume.HandleMessage can not be nil")
}
if data.HandleError == nil {
return xerr.New("goclub/sql: Database{}.ConsumeMessage(ctx, consume) consume.HandleError can not be nil")
}
return
}
func (db *Database) InitQueue(ctx context.Context, queueName string) (err error) {
createQueueTableSQL := "CREATE TABLE IF NOT EXISTS `queue_" + queueName + "` (" + `
id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
business_id bigint(20) unsigned NOT NULL,
priority tinyint(3) unsigned NOT NULL,
update_id char(24) NOT NULL,
consume_chance smallint(6) unsigned NOT NULL,
max_consume_chance smallint(6) unsigned NOT NULL,
next_consume_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
KEY business_id (business_id),
KEY update_id (update_id),
KEY next_consume_time__consume_chance__priority (next_consume_time,consume_chance,max_consume_chance,priority)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;`
_, err = db.Exec(ctx, createQueueTableSQL, nil) // indivisible begin
if err != nil { // indivisible end
return err
}
createDeadLetterTableSQL := "CREATE TABLE IF NOT EXISTS `queue_" + queueName + "_dead_letter` (" + `
id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
business_id bigint(20) unsigned NOT NULL,
reason varchar(255) NOT NULL DEFAULT '',
handled tinyint(3) unsigned NOT NULL,
handled_result varchar(255) NOT NULL,
create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
KEY business_id (business_id),
KEY create_time (create_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;`
_, err = db.Exec(ctx, createDeadLetterTableSQL, nil) // indivisible begin
if err != nil { // indivisible end
return err
}
return
}
func (db *Database) ConsumeMessage(ctx context.Context, consume Consume) error {
err := consume.initAndCheck(db) // indivisible begin
if err != nil { // indivisible end
return err
}
readInterval := time.Second
for {
time.Sleep(readInterval)
consumed, err := db.tryReadQueueMessage(ctx, consume) // indivisible begin
if err != nil { // indivisible end
consumed = false
consume.HandleError(err)
}
if consumed {
readInterval = time.Nanosecond
} else {
readInterval = time.Second
}
}
}
func (db *Database) tryReadQueueMessage(ctx context.Context, consume Consume) (consumed bool, err error) {
message := Message{QueueName: consume.QueueName}
message.consume = consume
var queueIDs []uint64
// 查询10个id
err = db.QuerySliceScaner(ctx, QB{
From: &message,
Select: []Column{"id"},
Where: AndRaw(`next_consume_time <= ?`, time.Now().In(db.QueueTimeLocation)).
AndRaw(`consume_chance < max_consume_chance`),
OrderBy: []OrderBy{
{"priority", DESC},
},
Limit: 10,
}, ScanUint64s(&queueIDs))
if err != nil {
return
}
// 无结果则退出更新
if len(queueIDs) == 0 {
return
}
updateID := NanoID24()
// 通过更新并发安全的标记数据 (使用where id = 进行更新,避免并发事务死锁)
change, err := db.UpdateAffected(ctx, &message, QB{
Index: "update_id",
Set: Set("update_id", updateID).
SetRaw(`consume_chance = consume_chance + ?`, 1).
// 先将下次消费时间固定更新到10分钟后避免后续进程中断或sql执行失败导致被重复消费
Set("next_consume_time", time.Now().In(db.QueueTimeLocation).Add(time.Minute*10)),
Where: And("id", In(queueIDs)).AndRaw("consume_chance < max_consume_chance"),
OrderBy: []OrderBy{
{"priority", DESC},
},
Limit: 1,
}) // indivisible begin
if err != nil { // indivisible end
return
}
// 无结果则退出更新
if change == 0 {
return
}
// 查询完整queue数据
hasUpdateMessage, err := db.Query(ctx, &message, QB{
Where: And("update_id", Equal(updateID)),
Limit: 1,
}) // indivisible begin
if err != nil { // indivisible end
return
}
if hasUpdateMessage == false {
err = xerr.New("goclub/sql: unexpected: Database{}.ConsumeMessage(): update_id(" + updateID + ") should has")
return
}
consumed = true
mqResult := consume.HandleMessage(message)
if mqResult.err != nil {
consume.HandleError(mqResult.err)
}
var execErr error
if mqResult.ack {
if execErr = message.execAck(db); execErr != nil {
consume.HandleError(execErr)
}
} else if mqResult.requeue {
if execErr = message.execRequeue(db, mqResult.requeueDelay); execErr != nil {
consume.HandleError(execErr)
}
} else if mqResult.deadLetter {
if execErr = message.execDeadLetter(db, mqResult.deadLetterReason); execErr != nil {
consume.HandleError(execErr)
}
} else {
consume.HandleError(xerr.New("consume.HandleMessage not allow return empty MessageRequest,messageID:" + strconv.FormatUint(message.ID, 10)))
}
return
}