Skip to content

Commit

Permalink
Merge pull request #74 from pdffiller/release/3.5.0
Browse files Browse the repository at this point in the history
Release/3.5.0
  • Loading branch information
osadchyi-s authored Feb 8, 2019
2 parents f88cd00 + 55029db commit c3fd059
Show file tree
Hide file tree
Showing 11 changed files with 165 additions and 25 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

## [3.5.0] - 2019-02-08
### Added
- Added possibility for synchronously job handle.
- Added possibility for push to empty subscribers (queues).
- Added failure getter to a job

## [3.4.0] - 2019-01-10
### Added
- Added failed flag.
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Documentation is borrowed from [seomoz/qless](https://github.com/seomoz/qless).
- [Event System](#event-system)
- [Per-Job Events](#per-job-events)
- [List of Events](#list-of-events)
- [Sync job processing](#sync-job-processing)
- [Heartbeating](#heartbeating)
- [Stats](#stats)
- [Time](#time)
Expand Down Expand Up @@ -716,7 +717,16 @@ Full list of events available in Qless:
| **Queue** | `queue:beforeEnqueue` | `\Qless\Events\User\Queue\BeforeEnqueue`
| **Queue** | `queue:afterEnqueue` | `\Qless\Events\User\Queue\AfterEnqueue`

### Sync job processing

If you want your job to be processed without worker, you can set sync mode for qless client. In configuration of your project write code like this:
```php
/** @var \Qless\Client $client */
$client->config->set('sync-enabled', true);
```
Now you all job will be process without worker, synchronously.

**Note**: Use it feature for testing your job in development environment.
### Heartbeating

**`@todo`**
Expand Down
1 change: 1 addition & 0 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* @method string recur(string $queue, string $jid, string $klass, string $data, string $spec, ...$args)
* @method string requeue(string $worker, string $queue, string $jid, string $klass, string $data, int $delay, ...$args)
* @method string pop(string $queue, string $worker, int $count)
* @method string popByJid(string $queue, string $jid, string $worker)
* @method int length(string $queue)
* @method float heartbeat(...$args)
* @method int retry(string $jid, string $queue, string $worker, int $delay, string $group, string $message)
Expand Down
8 changes: 8 additions & 0 deletions src/Jobs/AbstractJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,14 @@ public function getData(): JobData
return $this->data;
}

/**
* @return array
*/
public function getFailure(): array
{
return $this->rawData['failure'] ?? [];
}

/**
* Add the specified tags to this job.
*
Expand Down
17 changes: 7 additions & 10 deletions src/Queues/Collection.php
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,17 @@ public function fromSubscriptions(string $topic): array
return $response;
}

$queues = json_decode($this->client->queues(), true) ?: [];
$subscriptions = $this->client->call('subscription', 'default', 'all', $topic);
$subscriptions = json_decode($subscriptions, true) ?: [];

foreach ($queues as $queue) {
$subscriptions = $this->client->call('subscription', $queue['name'], 'get');
$subscriptions = json_decode($subscriptions, true) ?: [];
foreach ($subscriptions as $subscription) {
$topicPattern = str_replace(['.', '*', '#'], ['\.', '[a-zA-z0-9^.]{1,}', '.*'], $subscription);
if (preg_match("/^$topicPattern$/", $topic)) {
$response[] = $queue['name'];
}
foreach ($subscriptions as $subscription => $queues) {
$topicPattern = str_replace(['.', '*', '#'], ['\.', '[a-zA-z0-9^.]{1,}', '.*'], $subscription);
if (preg_match("/^$topicPattern$/", $topic)) {
$response = array_merge($response, $queues);
}
}

return $response;
return array_unique($response);
}

/**
Expand Down
39 changes: 39 additions & 0 deletions src/Queues/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public function __construct(string $name, Client $client)
$this->name = $name;

$this->setEventsManager($this->client->getEventsManager());
$this->registerSyncCompleteEvent();
}

/**
Expand Down Expand Up @@ -143,6 +144,27 @@ public function pop(?string $worker = null, ?int $numJobs = null)
return $numJobs === null ? array_shift($jobs) : $jobs;
}

/**
* Get job by JID from this queue.
*
* @param string $jid
* @param string|null $worker
* @return BaseJob|null
*/
public function popByJid(string $jid, ?string $worker = null): ?BaseJob
{
$workerName = $worker ?: $this->client->getWorkerName();
$data = json_decode($this->client->popByJid($this->name, $jid, $workerName), true);
$jobData = array_reduce($data, 'array_merge', []); //unwrap nested array

if ($jobData['jid'] === $jid) {
$job = new BaseJob($this->client, $jobData);
$job->setEventsManager($this->getEventsManager());
}

return $job ?? null;
}

/**
* Make a recurring job in this queue.
*
Expand Down Expand Up @@ -405,6 +427,23 @@ public function unSubscribe(string $topicPattern): bool
return $this->client->subscription($this->name, 'remove', $topicPattern) == 'true';
}

/**
* Immediately handle job if sync mode enabled
*/
private function registerSyncCompleteEvent(): void
{
$this->getEventsManager()
->attach(QueueEvent\AfterEnqueue::getName(), function (QueueEvent\AfterEnqueue $event) {
if (!$this->client->config->get('sync-enabled')) {
return;
}
$job = $this->popByJid($event->getJid());
if (!empty($job)) {
$job->perform();
}
});
}

/**
* Gets this queue name.
*
Expand Down
2 changes: 1 addition & 1 deletion src/Topics/Topic.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
/**
* Class Topic
* @package Qless\Topics
* @method string put(string $class, array $data, ?string $jid = null, ...$args)
* @method array put(string $class, array $data, ?string $jid = null, ...$args)
* @method BaseJob pop(?string $worker = null, int $numJobs = 0)
*/
class Topic
Expand Down
71 changes: 67 additions & 4 deletions src/qless-core/qless.lua
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,27 @@ end

function Qless.subscription(now, queue, command, topic)
assert(command,
'Tag(): Arg "command" must be "add", "remove", "get", "exists"')
'Tag(): Arg "command" must be "add", "remove", "get", "all", "exists"')

if command == 'get' then
return redis.call('hvals', 'ql:topics' .. queue)
return redis.call('keys', 'ql:topics' .. queue)
elseif command == 'all' then
local topics = {}
local _topics = redis.call('keys', 'ql:topics*')
local _value = {}
local _topic
local _return = {}
for i,v in ipairs(_topics) do
_value = redis.call('hvals', v)
_topic = redis.call('hkeys', v)
for index,topic_name in ipairs(_topic) do
if (_return[topic_name] == nil) then
_return[topic_name] = {}
end
_return[topic_name][index] = _value[index]
end
end
return _return
elseif command == 'exists' then
local exists = redis.call('hexists', 'ql:topics' .. queue, topic)
return (exists == 1)
Expand All @@ -156,10 +173,10 @@ function Qless.subscription(now, queue, command, topic)
for i,v in ipairs(topics) do _topics[v] = true end

if _topics[topic] == nil or _topics[topic] == false then
redis.call('hset', 'ql:topics' .. queue, topic, topic)
redis.call('hset', 'ql:topics' .. queue, topic, queue)
end
else
redis.call('hset', 'ql:topics' .. queue, topic, topic)
redis.call('hset', 'ql:topics' .. queue, topic, queue)
table.insert(topics, topic)
end

Expand Down Expand Up @@ -1241,6 +1258,45 @@ function QlessQueue:pop(now, worker, count)
return jids
end

function QlessQueue:popByJid(now ,jid, worker)
assert(jid, 'Pop(): Arg "jid" missing')
assert(worker, 'Pop(): Arg "worker" missing')

local expires = now + tonumber(
Qless.config.get(self.name .. '-heartbeat') or
Qless.config.get('heartbeat', 60))

redis.call('zadd', 'ql:workers', now, worker)

local job = Qless.job(jid)

job:history(now, 'popped', {worker = worker})

local time = tonumber(
redis.call('hget', QlessJob.ns .. jid, 'time') or now)
local waiting = now - time
self:stat(now, 'wait', waiting)
redis.call('hset', QlessJob.ns .. jid,
'time', string.format("%.20f", now))

redis.call('zadd', 'ql:w:' .. worker .. ':jobs', expires, jid)

job:update({
worker = worker,
expires = expires,
state = 'running'
})

self.locks.add(expires, jid)

local tracked = redis.call('zscore', 'ql:tracked', jid) ~= false

if tracked then
Qless.publish('popped', jid)
end

return jid
end
function QlessQueue:stat(now, stat, val)
local bin = now - (now % 86400)
local key = 'ql:s:' .. stat .. ':' .. bin .. ':' .. self.name
Expand Down Expand Up @@ -1975,6 +2031,13 @@ QlessAPI.pop = function(now, queue, worker, count)
return cjson.encode(response)
end

QlessAPI.popByJid = function(now, queue, jid, worker)
local jid = Qless.queue(queue):popByJid(now, jid, worker)
local response = {}
table.insert(response, Qless.job(jid):data())
return cjson.encode(response)
end

QlessAPI.pause = function(now, ...)
return QlessQueue.pause(now, unpack(arg))
end
Expand Down
19 changes: 19 additions & 0 deletions tests/Jobs/BaseJobTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -568,4 +568,23 @@ public function testThrowsInvalidJobExceptionWhenRequeuingCancelledJob()
$this->client->cancel('jid-1');
$job->requeue();
}

public function testJobCanCompleteSync()
{
$queue = $this->client->queues['test-queue'];

$queue->put(JobHandler::class, []);

$this->client->config->set('sync-enabled', true);

$jid = $queue->put(JobHandler::class, []);

$job = $this->client->jobs[$jid];

$this->client->config->clear('sync-enabled');

$this->assertIsJob($job);
$this->assertArrayHasKey('stack', $job->data->toArray());
$this->assertFalse($job->getFailed());
}
}
17 changes: 7 additions & 10 deletions tests/Topics/TopicTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ public function shouldGetQueuesBySubscription()
$queues = [];
for ($i = 1; $i<=2; $i++) {
$queues[$i] = new Queue('test-queue-' . $i, $this->client);
$queues[$i]->put('Zzz\Yyy', []);
$queues[$i]->pop();
}

$queues[1]->subscribe('big.*.*');
$queues[2]->subscribe('big.*.apples');

$queuesCollection = new Collection($this->client);

$this->assertEquals(['test-queue-1', 'test-queue-2'], $queuesCollection->fromSubscriptions('big.green.apples'));
$this->assertEquals(['test-queue-1', 'test-queue-2'], $queuesCollection->fromSubscriptions('big.red.apples'));
$this->assertEquals(['test-queue-2', 'test-queue-1'], $queuesCollection->fromSubscriptions('big.green.apples'));
$this->assertEquals(['test-queue-2', 'test-queue-1'], $queuesCollection->fromSubscriptions('big.red.apples'));
$this->assertEquals([], $queuesCollection->fromSubscriptions('*.*.oranges'));

$queues[1]->unsubscribe('big.*.*');
$queues[2]->unsubscribe('big.*.apples');
}

/** @test */
Expand All @@ -38,8 +39,6 @@ public function shouldQueueSubscribe()
$queues = [];
for ($i = 1; $i<=5; $i++) {
$queues[$i] = new Queue('test-queue-' . $i, $this->client);
$queues[$i]->put('Zzz\Yyy', []);
$queues[$i]->pop();
}

$queues[1]->subscribe('big.*.*');
Expand All @@ -61,7 +60,7 @@ public function shouldQueueSubscribe()
$this->assertEquals('Xxx\Yyy', $job1->getKlass());
$this->assertEquals('Xxx\Yyy', $job2->getKlass());
$this->assertEquals('Xxx\Yyy', $job3->getKlass());
$this->assertEquals(null, $job4);
$this->assertEmpty($job4);
$this->assertEquals('Xxx\Yyy', $job5->getKlass());
}

Expand All @@ -71,8 +70,6 @@ public function shouldQueueUnSubscribe()
$queues = [];
for ($i = 1; $i<=3; $i++) {
$queues[$i] = new Queue('test-queue-' . $i, $this->client);
$queues[$i]->put('Zzz\Yyy', []);
$queues[$i]->pop();
}

$queues[1]->subscribe('big.*.*');
Expand All @@ -91,7 +88,7 @@ public function shouldQueueUnSubscribe()
$job3 = $queues[3]->pop();

$this->assertEquals('Xxx\Yyy', $job1->getKlass());
$this->assertEquals(null, $job2);
$this->assertEmpty($job2);
$this->assertEquals('Xxx\Yyy', $job3->getKlass());
}
}
Empty file added tests/publish.php.log
Empty file.

0 comments on commit c3fd059

Please sign in to comment.