This repository has been archived by the owner on Jan 14, 2020. It is now read-only.
forked from PHPNuts/yii2-queue
-
Notifications
You must be signed in to change notification settings - Fork 0
/
RedisQueue.php
124 lines (106 loc) · 2.91 KB
/
RedisQueue.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
<?php
/**
* @link http://www.yiiframework.com/
* @copyright Copyright (c) 2008 Yii Software LLC
* @license http://www.yiiframework.com/license/
*/
namespace yii\queue;
use Predis\Client;
use Predis\Transaction\MultiExec;
use Yii;
use yii\base\Component;
use yii\base\InvalidConfigException;
use yii\helpers\Json;
/**
* RedisQueue
*
* @author Alexander Kochetov <creocoder@gmail.com>
*/
class RedisQueue extends Component implements QueueInterface
{
/**
* @var Client|array
*/
public $redis;
/**
* @var integer
*/
public $expire = 60;
/**
* @inheritdoc
*/
public function init()
{
parent::init();
if ($this->redis === null) {
throw new InvalidConfigException('The "redis" property must be set.');
}
if (!$this->redis instanceof Client) {
$this->redis = new Client($this->redis);
}
}
/**
* @inheritdoc
*/
public function push($payload, $queue, $delay = 0)
{
$payload = Json::encode(['id' => $id = md5(uniqid('', true)), 'body' => $payload]);
if ($delay > 0) {
$this->redis->zadd($queue . ':delayed', [$payload => time() + $delay]);
} else {
$this->redis->rpush($queue, [$payload]);
}
return $id;
}
/**
* @inheritdoc
*/
public function pop($queue)
{
foreach ([':delayed', ':reserved'] as $type) {
$options = ['cas' => true, 'watch' => $queue . $type];
$this->redis->transaction($options, function (MultiExec $transaction) use ($queue, $type) {
$data = $this->redis->zrangebyscore($queue . $type, '-inf', $time = time());
if (!empty($data)) {
$transaction->zremrangebyscore($queue . $type, '-inf', $time);
$transaction->rpush($queue, $data);
}
});
}
$data = $this->redis->lpop($queue);
if ($data === null) {
return false;
}
$this->redis->zadd($queue . ':reserved', [$data => time() + $this->expire]);
$data = Json::decode($data);
return [
'id' => $data['id'],
'body' => $data['body'],
'queue' => $queue,
];
}
/**
* @inheritdoc
*/
public function purge($queue) {
$this->redis->del([$queue, $queue . ':delayed', $queue . ':reserved']);
}
/**
* @inheritdoc
*/
public function release(array $message, $delay = 0)
{
if ($delay > 0) {
$this->redis->zadd($message['queue'] . ':delayed', [$message['body'] => time() + $delay]);
} else {
$this->redis->rpush($message['queue'], [$message['body']]);
}
}
/**
* @inheritdoc
*/
public function delete(array $message)
{
$this->redis->zrem($message['queue'] . ':reserved', $message['body']);
}
}