Skip to content

Commit

Permalink
redis new implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ASKozienko committed Oct 29, 2018
1 parent 92d8bf5 commit cd98523
Show file tree
Hide file tree
Showing 14 changed files with 286 additions and 236 deletions.
41 changes: 41 additions & 0 deletions JsonSerializer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

declare(strict_types=1);

namespace Enqueue\Redis;

class JsonSerializer implements Serializer
{
public function toString(RedisMessage $message): string
{
$json = json_encode([
'body' => $message->getBody(),
'properties' => $message->getProperties(),
'headers' => $message->getHeaders(),
]);

if (JSON_ERROR_NONE !== json_last_error()) {
throw new \InvalidArgumentException(sprintf(
'The malformed json given. Error %s and message %s',
json_last_error(),
json_last_error_msg()
));
}

return $json;
}

public function toMessage(string $string): RedisMessage
{
$data = json_decode($string, true);
if (JSON_ERROR_NONE !== json_last_error()) {
throw new \InvalidArgumentException(sprintf(
'The malformed json given. Error %s and message %s',
json_last_error(),
json_last_error_msg()
));
}

return new RedisMessage($data['body'], $data['properties'], $data['headers']);
}
}
7 changes: 5 additions & 2 deletions RedisConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class RedisConnectionFactory implements ConnectionFactory
* 'read_write_timeout' => Timeout (expressed in seconds) used when performing read or write operations on the underlying network resource after which an exception is thrown.
* 'predis_options' => An array of predis specific options.
* 'ssl' => could be any of http://fi2.php.net/manual/en/context.ssl.php#refsect1-context.ssl-options
* 'redelivery_delay' => Default 300 sec. Returns back message into the queue if message was not acknowledged or rejected after this delay.
* It could happen if consumer has failed with fatal error or even if message processing is slow and takes more than this time.
* ].
*
* or
Expand Down Expand Up @@ -85,10 +87,10 @@ public function createContext(): Context
if ($this->config['lazy']) {
return new RedisContext(function () {
return $this->createRedis();
});
}, $this->config['redelivery_delay']);
}

return new RedisContext($this->createRedis());
return new RedisContext($this->createRedis(), $this->config['redelivery_delay']);
}

private function createRedis(): Redis
Expand Down Expand Up @@ -158,6 +160,7 @@ private function defaultConfig(): array
'read_write_timeout' => null,
'predis_options' => null,
'ssl' => null,
'redelivery_delay' => 300,
];
}
}
51 changes: 18 additions & 33 deletions RedisConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

class RedisConsumer implements Consumer
{
use RedisConsumerHelperTrait;

/**
* @var RedisDestination
*/
Expand All @@ -24,12 +26,7 @@ class RedisConsumer implements Consumer
/**
* @var int
*/
private $retryDelay;

/**
* @var RedisQueueConsumer
*/
private $queueConsumer;
private $redeliveryDelay = 300;

public function __construct(RedisContext $context, RedisDestination $queue)
{
Expand All @@ -40,21 +37,17 @@ public function __construct(RedisContext $context, RedisDestination $queue)
/**
* @return int
*/
public function getRetryDelay(): ?int
public function getRedeliveryDelay(): ?int
{
return $this->retryDelay;
return $this->redeliveryDelay;
}

/**
* @param int $retryDelay
* @param int $delay
*/
public function setRetryDelay(int $retryDelay): void
public function setRedeliveryDelay(int $delay): void
{
$this->retryDelay = $retryDelay;

if ($this->queueConsumer) {
$this->queueConsumer->setRetryDelay($this->retryDelay);
}
$this->redeliveryDelay = $delay;
}

/**
Expand All @@ -80,19 +73,15 @@ public function receive(int $timeout = 0): ?Message
}
}

$this->initQueueConsumer();

return $this->queueConsumer->receiveMessage($timeout);
return $this->receiveMessage([$this->queue], $timeout, $this->redeliveryDelay);
}

/**
* @return RedisMessage
*/
public function receiveNoWait(): ?Message
{
$this->initQueueConsumer();

return $this->queueConsumer->receiveMessageNoWait($this->queue);
return $this->receiveMessageNoWait($this->queue, $this->redeliveryDelay);
}

/**
Expand All @@ -113,30 +102,26 @@ public function reject(Message $message, bool $requeue = false): void
$this->acknowledge($message);

if ($requeue) {
$message = RedisMessage::jsonUnserialize($message->getReservedKey());
$message = $this->getContext()->getSerializer()->toMessage($message->getReservedKey());
$message->setHeader('attempts', 0);

if ($message->getTimeToLive()) {
$message->setHeader('expires_at', time() + $message->getTimeToLive());
}

$this->getRedis()->lpush($this->queue->getName(), json_encode($message));
$payload = $this->getContext()->getSerializer()->toString($message);

$this->getRedis()->lpush($this->queue->getName(), $payload);
}
}

private function getRedis(): Redis
private function getContext(): RedisContext
{
return $this->context->getRedis();
return $this->context;
}

private function initQueueConsumer(): void
private function getRedis(): Redis
{
if (null === $this->queueConsumer) {
$this->queueConsumer = new RedisQueueConsumer($this->getRedis(), [$this->queue]);

if ($this->retryDelay) {
$this->queueConsumer->setRetryDelay($this->retryDelay);
}
}
return $this->context->getRedis();
}
}
116 changes: 116 additions & 0 deletions RedisConsumerHelperTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
<?php

declare(strict_types=1);

namespace Enqueue\Redis;

trait RedisConsumerHelperTrait
{
/**
* @var string[]
*/
protected $queueNames;

abstract function getContext(): RedisContext;

/**
* @param RedisDestination[] $queues
* @param int $timeout
* @param int $redeliveryDelay
*
* @return RedisMessage|null
*/
protected function receiveMessage(array $queues, int $timeout, int $redeliveryDelay): ?RedisMessage
{
$startAt = time();
$thisTimeout = $timeout;

if (null === $this->queueNames) {
$this->queueNames = [];
foreach ($queues as $queue) {
$this->queueNames[] = $queue->getName();
}
}

while ($thisTimeout > 0) {
$this->migrateExpiredMessages($this->queueNames);

if ($result = $this->getContext()->getRedis()->brpop($this->queueNames, $thisTimeout)) {
$this->pushQueueNameBack($result->getKey());

if ($message = $this->processResult($result, $redeliveryDelay)) {
return $message;
}
}

$thisTimeout -= time() - $startAt;
}

return null;
}

protected function receiveMessageNoWait(RedisDestination $destination, int $redeliveryDelay): ?RedisMessage
{
$this->migrateExpiredMessages([$destination->getName()]);

if ($result = $this->getContext()->getRedis()->rpop($destination->getName())) {
return $this->processResult($result, $redeliveryDelay);
}

return null;
}

protected function processResult(RedisResult $result, int $redeliveryDelay): ?RedisMessage
{
$message = $this->getContext()->getSerializer()->toMessage($result->getMessage());

$now = time();

if ($expiresAt = $message->getHeader('expires_at')) {
if ($now > $expiresAt) {
return null;
}
}

$message->setHeader('attempts', $message->getAttempts() + 1);
$message->setRedelivered($message->getAttempts() > 1);
$message->setKey($result->getKey());
$message->setReservedKey($this->getContext()->getSerializer()->toString($message));

$reservedQueue = $result->getKey().':reserved';
$redeliveryAt = $now + $redeliveryDelay;

$this->getContext()->getRedis()->zadd($reservedQueue, $message->getReservedKey(), $redeliveryAt);

return $message;
}

protected function pushQueueNameBack(string $queueName): void
{
if (count($this->queueNames) <= 1) {
return;
}

if (false === $from = array_search($queueName, $this->queueNames, true)) {
throw new \LogicException(sprintf('Queue name was not found: "%s"', $queueName));
}

$to = count($this->queueNames) - 1;

$out = array_splice($this->queueNames, $from, 1);
array_splice($this->queueNames, $to, 0, $out);
}

protected function migrateExpiredMessages(array $queueNames): void
{
$now = time();

foreach ($queueNames as $queueName) {
$this->getContext()->getRedis()
->eval(LuaScripts::migrateExpired(), [$queueName.':delayed', $queueName], [$now]);

$this->getContext()->getRedis()
->eval(LuaScripts::migrateExpired(), [$queueName.':reserved', $queueName], [$now]);
}
}
}
25 changes: 21 additions & 4 deletions RedisContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

class RedisContext implements Context
{
use SerializerAwareTrait;

/**
* @var Redis
*/
Expand All @@ -27,12 +29,18 @@ class RedisContext implements Context
*/
private $redisFactory;

/**
* @var int
*/
private $redeliveryDelay = 300;

/**
* Callable must return instance of Redis once called.
*
* @param Redis|callable $redis
* @param int $redeliveryDelay
*/
public function __construct($redis)
public function __construct($redis, int $redeliveryDelay)
{
if ($redis instanceof Redis) {
$this->redis = $redis;
Expand All @@ -45,6 +53,9 @@ public function __construct($redis)
Redis::class
));
}

$this->redeliveryDelay = $redeliveryDelay;
$this->setSerializer(new JsonSerializer());
}

/**
Expand Down Expand Up @@ -101,7 +112,7 @@ public function createTemporaryQueue(): Queue
*/
public function createProducer(): Producer
{
return new RedisProducer($this->getRedis());
return new RedisProducer($this);
}

/**
Expand All @@ -113,15 +124,21 @@ public function createConsumer(Destination $destination): Consumer
{
InvalidDestinationException::assertDestinationInstanceOf($destination, RedisDestination::class);

return new RedisConsumer($this, $destination);
$consumer = new RedisConsumer($this, $destination);
$consumer->setRedeliveryDelay($this->redeliveryDelay);

return $consumer;
}

/**
* @return RedisSubscriptionConsumer
*/
public function createSubscriptionConsumer(): SubscriptionConsumer
{
return new RedisSubscriptionConsumer($this);
$consumer = new RedisSubscriptionConsumer($this);
$consumer->setRedeliveryDelay($this->redeliveryDelay);

return $consumer;
}

/**
Expand Down
Loading

0 comments on commit cd98523

Please sign in to comment.