diff --git a/CHANGELOG.md b/CHANGELOG.md index 588f306..20bea64 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +## [3.5.0] - 2019-02-08 +### Added +- Added possibility for synchronously job handle. +- Added possibility for push to empty subscribers (queues). +- Added failure getter to a job + ## [3.4.0] - 2019-01-10 ### Added - Added failed flag. diff --git a/README.md b/README.md index 06f359c..6c69ae3 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,7 @@ Documentation is borrowed from [seomoz/qless](https://github.com/seomoz/qless). - [Event System](#event-system) - [Per-Job Events](#per-job-events) - [List of Events](#list-of-events) + - [Sync job processing](#sync-job-processing) - [Heartbeating](#heartbeating) - [Stats](#stats) - [Time](#time) @@ -716,7 +717,16 @@ Full list of events available in Qless: | **Queue** | `queue:beforeEnqueue` | `\Qless\Events\User\Queue\BeforeEnqueue` | **Queue** | `queue:afterEnqueue` | `\Qless\Events\User\Queue\AfterEnqueue` +### Sync job processing +If you want your job to be processed without worker, you can set sync mode for qless client. In configuration of your project write code like this: +```php +/** @var \Qless\Client $client */ +$client->config->set('sync-enabled', true); +``` +Now you all job will be process without worker, synchronously. + +**Note**: Use it feature for testing your job in development environment. ### Heartbeating **`@todo`** diff --git a/src/Client.php b/src/Client.php index 354002d..784e409 100644 --- a/src/Client.php +++ b/src/Client.php @@ -21,6 +21,7 @@ * @method string recur(string $queue, string $jid, string $klass, string $data, string $spec, ...$args) * @method string requeue(string $worker, string $queue, string $jid, string $klass, string $data, int $delay, ...$args) * @method string pop(string $queue, string $worker, int $count) + * @method string popByJid(string $queue, string $jid, string $worker) * @method int length(string $queue) * @method float heartbeat(...$args) * @method int retry(string $jid, string $queue, string $worker, int $delay, string $group, string $message) diff --git a/src/Jobs/AbstractJob.php b/src/Jobs/AbstractJob.php index b777605..69a9719 100644 --- a/src/Jobs/AbstractJob.php +++ b/src/Jobs/AbstractJob.php @@ -181,6 +181,14 @@ public function getData(): JobData return $this->data; } + /** + * @return array + */ + public function getFailure(): array + { + return $this->rawData['failure'] ?? []; + } + /** * Add the specified tags to this job. * diff --git a/src/Queues/Collection.php b/src/Queues/Collection.php index 0b0f308..c29043a 100644 --- a/src/Queues/Collection.php +++ b/src/Queues/Collection.php @@ -80,20 +80,17 @@ public function fromSubscriptions(string $topic): array return $response; } - $queues = json_decode($this->client->queues(), true) ?: []; + $subscriptions = $this->client->call('subscription', 'default', 'all', $topic); + $subscriptions = json_decode($subscriptions, true) ?: []; - foreach ($queues as $queue) { - $subscriptions = $this->client->call('subscription', $queue['name'], 'get'); - $subscriptions = json_decode($subscriptions, true) ?: []; - foreach ($subscriptions as $subscription) { - $topicPattern = str_replace(['.', '*', '#'], ['\.', '[a-zA-z0-9^.]{1,}', '.*'], $subscription); - if (preg_match("/^$topicPattern$/", $topic)) { - $response[] = $queue['name']; - } + foreach ($subscriptions as $subscription => $queues) { + $topicPattern = str_replace(['.', '*', '#'], ['\.', '[a-zA-z0-9^.]{1,}', '.*'], $subscription); + if (preg_match("/^$topicPattern$/", $topic)) { + $response = array_merge($response, $queues); } } - return $response; + return array_unique($response); } /** diff --git a/src/Queues/Queue.php b/src/Queues/Queue.php index 0559c5e..866d61f 100644 --- a/src/Queues/Queue.php +++ b/src/Queues/Queue.php @@ -43,6 +43,7 @@ public function __construct(string $name, Client $client) $this->name = $name; $this->setEventsManager($this->client->getEventsManager()); + $this->registerSyncCompleteEvent(); } /** @@ -143,6 +144,27 @@ public function pop(?string $worker = null, ?int $numJobs = null) return $numJobs === null ? array_shift($jobs) : $jobs; } + /** + * Get job by JID from this queue. + * + * @param string $jid + * @param string|null $worker + * @return BaseJob|null + */ + public function popByJid(string $jid, ?string $worker = null): ?BaseJob + { + $workerName = $worker ?: $this->client->getWorkerName(); + $data = json_decode($this->client->popByJid($this->name, $jid, $workerName), true); + $jobData = array_reduce($data, 'array_merge', []); //unwrap nested array + + if ($jobData['jid'] === $jid) { + $job = new BaseJob($this->client, $jobData); + $job->setEventsManager($this->getEventsManager()); + } + + return $job ?? null; + } + /** * Make a recurring job in this queue. * @@ -405,6 +427,23 @@ public function unSubscribe(string $topicPattern): bool return $this->client->subscription($this->name, 'remove', $topicPattern) == 'true'; } + /** + * Immediately handle job if sync mode enabled + */ + private function registerSyncCompleteEvent(): void + { + $this->getEventsManager() + ->attach(QueueEvent\AfterEnqueue::getName(), function (QueueEvent\AfterEnqueue $event) { + if (!$this->client->config->get('sync-enabled')) { + return; + } + $job = $this->popByJid($event->getJid()); + if (!empty($job)) { + $job->perform(); + } + }); + } + /** * Gets this queue name. * diff --git a/src/Topics/Topic.php b/src/Topics/Topic.php index 53f3892..01a6a83 100644 --- a/src/Topics/Topic.php +++ b/src/Topics/Topic.php @@ -10,7 +10,7 @@ /** * Class Topic * @package Qless\Topics - * @method string put(string $class, array $data, ?string $jid = null, ...$args) + * @method array put(string $class, array $data, ?string $jid = null, ...$args) * @method BaseJob pop(?string $worker = null, int $numJobs = 0) */ class Topic diff --git a/src/qless-core/qless.lua b/src/qless-core/qless.lua index 4ecca37..4856144 100644 --- a/src/qless-core/qless.lua +++ b/src/qless-core/qless.lua @@ -138,10 +138,27 @@ end function Qless.subscription(now, queue, command, topic) assert(command, - 'Tag(): Arg "command" must be "add", "remove", "get", "exists"') + 'Tag(): Arg "command" must be "add", "remove", "get", "all", "exists"') if command == 'get' then - return redis.call('hvals', 'ql:topics' .. queue) + return redis.call('keys', 'ql:topics' .. queue) + elseif command == 'all' then + local topics = {} + local _topics = redis.call('keys', 'ql:topics*') + local _value = {} + local _topic + local _return = {} + for i,v in ipairs(_topics) do + _value = redis.call('hvals', v) + _topic = redis.call('hkeys', v) + for index,topic_name in ipairs(_topic) do + if (_return[topic_name] == nil) then + _return[topic_name] = {} + end + _return[topic_name][index] = _value[index] + end + end + return _return elseif command == 'exists' then local exists = redis.call('hexists', 'ql:topics' .. queue, topic) return (exists == 1) @@ -156,10 +173,10 @@ function Qless.subscription(now, queue, command, topic) for i,v in ipairs(topics) do _topics[v] = true end if _topics[topic] == nil or _topics[topic] == false then - redis.call('hset', 'ql:topics' .. queue, topic, topic) + redis.call('hset', 'ql:topics' .. queue, topic, queue) end else - redis.call('hset', 'ql:topics' .. queue, topic, topic) + redis.call('hset', 'ql:topics' .. queue, topic, queue) table.insert(topics, topic) end @@ -1241,6 +1258,45 @@ function QlessQueue:pop(now, worker, count) return jids end +function QlessQueue:popByJid(now ,jid, worker) + assert(jid, 'Pop(): Arg "jid" missing') + assert(worker, 'Pop(): Arg "worker" missing') + + local expires = now + tonumber( + Qless.config.get(self.name .. '-heartbeat') or + Qless.config.get('heartbeat', 60)) + + redis.call('zadd', 'ql:workers', now, worker) + + local job = Qless.job(jid) + + job:history(now, 'popped', {worker = worker}) + + local time = tonumber( + redis.call('hget', QlessJob.ns .. jid, 'time') or now) + local waiting = now - time + self:stat(now, 'wait', waiting) + redis.call('hset', QlessJob.ns .. jid, + 'time', string.format("%.20f", now)) + + redis.call('zadd', 'ql:w:' .. worker .. ':jobs', expires, jid) + + job:update({ + worker = worker, + expires = expires, + state = 'running' + }) + + self.locks.add(expires, jid) + + local tracked = redis.call('zscore', 'ql:tracked', jid) ~= false + + if tracked then + Qless.publish('popped', jid) + end + + return jid +end function QlessQueue:stat(now, stat, val) local bin = now - (now % 86400) local key = 'ql:s:' .. stat .. ':' .. bin .. ':' .. self.name @@ -1975,6 +2031,13 @@ QlessAPI.pop = function(now, queue, worker, count) return cjson.encode(response) end +QlessAPI.popByJid = function(now, queue, jid, worker) + local jid = Qless.queue(queue):popByJid(now, jid, worker) + local response = {} + table.insert(response, Qless.job(jid):data()) + return cjson.encode(response) +end + QlessAPI.pause = function(now, ...) return QlessQueue.pause(now, unpack(arg)) end diff --git a/tests/Jobs/BaseJobTest.php b/tests/Jobs/BaseJobTest.php index 697df0e..9675829 100644 --- a/tests/Jobs/BaseJobTest.php +++ b/tests/Jobs/BaseJobTest.php @@ -568,4 +568,23 @@ public function testThrowsInvalidJobExceptionWhenRequeuingCancelledJob() $this->client->cancel('jid-1'); $job->requeue(); } + + public function testJobCanCompleteSync() + { + $queue = $this->client->queues['test-queue']; + + $queue->put(JobHandler::class, []); + + $this->client->config->set('sync-enabled', true); + + $jid = $queue->put(JobHandler::class, []); + + $job = $this->client->jobs[$jid]; + + $this->client->config->clear('sync-enabled'); + + $this->assertIsJob($job); + $this->assertArrayHasKey('stack', $job->data->toArray()); + $this->assertFalse($job->getFailed()); + } } diff --git a/tests/Topics/TopicTest.php b/tests/Topics/TopicTest.php index b5663f8..7ebf395 100644 --- a/tests/Topics/TopicTest.php +++ b/tests/Topics/TopicTest.php @@ -18,8 +18,6 @@ public function shouldGetQueuesBySubscription() $queues = []; for ($i = 1; $i<=2; $i++) { $queues[$i] = new Queue('test-queue-' . $i, $this->client); - $queues[$i]->put('Zzz\Yyy', []); - $queues[$i]->pop(); } $queues[1]->subscribe('big.*.*'); @@ -27,9 +25,12 @@ public function shouldGetQueuesBySubscription() $queuesCollection = new Collection($this->client); - $this->assertEquals(['test-queue-1', 'test-queue-2'], $queuesCollection->fromSubscriptions('big.green.apples')); - $this->assertEquals(['test-queue-1', 'test-queue-2'], $queuesCollection->fromSubscriptions('big.red.apples')); + $this->assertEquals(['test-queue-2', 'test-queue-1'], $queuesCollection->fromSubscriptions('big.green.apples')); + $this->assertEquals(['test-queue-2', 'test-queue-1'], $queuesCollection->fromSubscriptions('big.red.apples')); $this->assertEquals([], $queuesCollection->fromSubscriptions('*.*.oranges')); + + $queues[1]->unsubscribe('big.*.*'); + $queues[2]->unsubscribe('big.*.apples'); } /** @test */ @@ -38,8 +39,6 @@ public function shouldQueueSubscribe() $queues = []; for ($i = 1; $i<=5; $i++) { $queues[$i] = new Queue('test-queue-' . $i, $this->client); - $queues[$i]->put('Zzz\Yyy', []); - $queues[$i]->pop(); } $queues[1]->subscribe('big.*.*'); @@ -61,7 +60,7 @@ public function shouldQueueSubscribe() $this->assertEquals('Xxx\Yyy', $job1->getKlass()); $this->assertEquals('Xxx\Yyy', $job2->getKlass()); $this->assertEquals('Xxx\Yyy', $job3->getKlass()); - $this->assertEquals(null, $job4); + $this->assertEmpty($job4); $this->assertEquals('Xxx\Yyy', $job5->getKlass()); } @@ -71,8 +70,6 @@ public function shouldQueueUnSubscribe() $queues = []; for ($i = 1; $i<=3; $i++) { $queues[$i] = new Queue('test-queue-' . $i, $this->client); - $queues[$i]->put('Zzz\Yyy', []); - $queues[$i]->pop(); } $queues[1]->subscribe('big.*.*'); @@ -91,7 +88,7 @@ public function shouldQueueUnSubscribe() $job3 = $queues[3]->pop(); $this->assertEquals('Xxx\Yyy', $job1->getKlass()); - $this->assertEquals(null, $job2); + $this->assertEmpty($job2); $this->assertEquals('Xxx\Yyy', $job3->getKlass()); } } diff --git a/tests/publish.php.log b/tests/publish.php.log new file mode 100644 index 0000000..e69de29