diff --git a/JsonSerializer.php b/JsonSerializer.php new file mode 100644 index 0000000..7e064a2 --- /dev/null +++ b/JsonSerializer.php @@ -0,0 +1,41 @@ + $message->getBody(), + 'properties' => $message->getProperties(), + 'headers' => $message->getHeaders(), + ]); + + if (JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf( + 'The malformed json given. Error %s and message %s', + json_last_error(), + json_last_error_msg() + )); + } + + return $json; + } + + public function toMessage(string $string): RedisMessage + { + $data = json_decode($string, true); + if (JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf( + 'The malformed json given. Error %s and message %s', + json_last_error(), + json_last_error_msg() + )); + } + + return new RedisMessage($data['body'], $data['properties'], $data['headers']); + } +} diff --git a/RedisConnectionFactory.php b/RedisConnectionFactory.php index 5d5ec3b..eee9d48 100644 --- a/RedisConnectionFactory.php +++ b/RedisConnectionFactory.php @@ -38,6 +38,8 @@ class RedisConnectionFactory implements ConnectionFactory * 'read_write_timeout' => Timeout (expressed in seconds) used when performing read or write operations on the underlying network resource after which an exception is thrown. * 'predis_options' => An array of predis specific options. * 'ssl' => could be any of http://fi2.php.net/manual/en/context.ssl.php#refsect1-context.ssl-options + * 'redelivery_delay' => Default 300 sec. Returns back message into the queue if message was not acknowledged or rejected after this delay. + * It could happen if consumer has failed with fatal error or even if message processing is slow and takes more than this time. * ]. * * or @@ -85,10 +87,10 @@ public function createContext(): Context if ($this->config['lazy']) { return new RedisContext(function () { return $this->createRedis(); - }); + }, $this->config['redelivery_delay']); } - return new RedisContext($this->createRedis()); + return new RedisContext($this->createRedis(), $this->config['redelivery_delay']); } private function createRedis(): Redis @@ -158,6 +160,7 @@ private function defaultConfig(): array 'read_write_timeout' => null, 'predis_options' => null, 'ssl' => null, + 'redelivery_delay' => 300, ]; } } diff --git a/RedisConsumer.php b/RedisConsumer.php index 4722d42..6ff23f4 100644 --- a/RedisConsumer.php +++ b/RedisConsumer.php @@ -11,6 +11,8 @@ class RedisConsumer implements Consumer { + use RedisConsumerHelperTrait; + /** * @var RedisDestination */ @@ -24,12 +26,7 @@ class RedisConsumer implements Consumer /** * @var int */ - private $retryDelay; - - /** - * @var RedisQueueConsumer - */ - private $queueConsumer; + private $redeliveryDelay = 300; public function __construct(RedisContext $context, RedisDestination $queue) { @@ -40,21 +37,17 @@ public function __construct(RedisContext $context, RedisDestination $queue) /** * @return int */ - public function getRetryDelay(): ?int + public function getRedeliveryDelay(): ?int { - return $this->retryDelay; + return $this->redeliveryDelay; } /** - * @param int $retryDelay + * @param int $delay */ - public function setRetryDelay(int $retryDelay): void + public function setRedeliveryDelay(int $delay): void { - $this->retryDelay = $retryDelay; - - if ($this->queueConsumer) { - $this->queueConsumer->setRetryDelay($this->retryDelay); - } + $this->redeliveryDelay = $delay; } /** @@ -80,9 +73,7 @@ public function receive(int $timeout = 0): ?Message } } - $this->initQueueConsumer(); - - return $this->queueConsumer->receiveMessage($timeout); + return $this->receiveMessage([$this->queue], $timeout, $this->redeliveryDelay); } /** @@ -90,9 +81,7 @@ public function receive(int $timeout = 0): ?Message */ public function receiveNoWait(): ?Message { - $this->initQueueConsumer(); - - return $this->queueConsumer->receiveMessageNoWait($this->queue); + return $this->receiveMessageNoWait($this->queue, $this->redeliveryDelay); } /** @@ -113,30 +102,26 @@ public function reject(Message $message, bool $requeue = false): void $this->acknowledge($message); if ($requeue) { - $message = RedisMessage::jsonUnserialize($message->getReservedKey()); + $message = $this->getContext()->getSerializer()->toMessage($message->getReservedKey()); $message->setHeader('attempts', 0); if ($message->getTimeToLive()) { $message->setHeader('expires_at', time() + $message->getTimeToLive()); } - $this->getRedis()->lpush($this->queue->getName(), json_encode($message)); + $payload = $this->getContext()->getSerializer()->toString($message); + + $this->getRedis()->lpush($this->queue->getName(), $payload); } } - private function getRedis(): Redis + private function getContext(): RedisContext { - return $this->context->getRedis(); + return $this->context; } - private function initQueueConsumer(): void + private function getRedis(): Redis { - if (null === $this->queueConsumer) { - $this->queueConsumer = new RedisQueueConsumer($this->getRedis(), [$this->queue]); - - if ($this->retryDelay) { - $this->queueConsumer->setRetryDelay($this->retryDelay); - } - } + return $this->context->getRedis(); } } diff --git a/RedisConsumerHelperTrait.php b/RedisConsumerHelperTrait.php new file mode 100644 index 0000000..41fab59 --- /dev/null +++ b/RedisConsumerHelperTrait.php @@ -0,0 +1,116 @@ +queueNames) { + $this->queueNames = []; + foreach ($queues as $queue) { + $this->queueNames[] = $queue->getName(); + } + } + + while ($thisTimeout > 0) { + $this->migrateExpiredMessages($this->queueNames); + + if ($result = $this->getContext()->getRedis()->brpop($this->queueNames, $thisTimeout)) { + $this->pushQueueNameBack($result->getKey()); + + if ($message = $this->processResult($result, $redeliveryDelay)) { + return $message; + } + } + + $thisTimeout -= time() - $startAt; + } + + return null; + } + + protected function receiveMessageNoWait(RedisDestination $destination, int $redeliveryDelay): ?RedisMessage + { + $this->migrateExpiredMessages([$destination->getName()]); + + if ($result = $this->getContext()->getRedis()->rpop($destination->getName())) { + return $this->processResult($result, $redeliveryDelay); + } + + return null; + } + + protected function processResult(RedisResult $result, int $redeliveryDelay): ?RedisMessage + { + $message = $this->getContext()->getSerializer()->toMessage($result->getMessage()); + + $now = time(); + + if ($expiresAt = $message->getHeader('expires_at')) { + if ($now > $expiresAt) { + return null; + } + } + + $message->setHeader('attempts', $message->getAttempts() + 1); + $message->setRedelivered($message->getAttempts() > 1); + $message->setKey($result->getKey()); + $message->setReservedKey($this->getContext()->getSerializer()->toString($message)); + + $reservedQueue = $result->getKey().':reserved'; + $redeliveryAt = $now + $redeliveryDelay; + + $this->getContext()->getRedis()->zadd($reservedQueue, $message->getReservedKey(), $redeliveryAt); + + return $message; + } + + protected function pushQueueNameBack(string $queueName): void + { + if (count($this->queueNames) <= 1) { + return; + } + + if (false === $from = array_search($queueName, $this->queueNames, true)) { + throw new \LogicException(sprintf('Queue name was not found: "%s"', $queueName)); + } + + $to = count($this->queueNames) - 1; + + $out = array_splice($this->queueNames, $from, 1); + array_splice($this->queueNames, $to, 0, $out); + } + + protected function migrateExpiredMessages(array $queueNames): void + { + $now = time(); + + foreach ($queueNames as $queueName) { + $this->getContext()->getRedis() + ->eval(LuaScripts::migrateExpired(), [$queueName.':delayed', $queueName], [$now]); + + $this->getContext()->getRedis() + ->eval(LuaScripts::migrateExpired(), [$queueName.':reserved', $queueName], [$now]); + } + } +} diff --git a/RedisContext.php b/RedisContext.php index 2c3b468..344bb20 100644 --- a/RedisContext.php +++ b/RedisContext.php @@ -17,6 +17,8 @@ class RedisContext implements Context { + use SerializerAwareTrait; + /** * @var Redis */ @@ -27,12 +29,18 @@ class RedisContext implements Context */ private $redisFactory; + /** + * @var int + */ + private $redeliveryDelay = 300; + /** * Callable must return instance of Redis once called. * * @param Redis|callable $redis + * @param int $redeliveryDelay */ - public function __construct($redis) + public function __construct($redis, int $redeliveryDelay) { if ($redis instanceof Redis) { $this->redis = $redis; @@ -45,6 +53,9 @@ public function __construct($redis) Redis::class )); } + + $this->redeliveryDelay = $redeliveryDelay; + $this->setSerializer(new JsonSerializer()); } /** @@ -101,7 +112,7 @@ public function createTemporaryQueue(): Queue */ public function createProducer(): Producer { - return new RedisProducer($this->getRedis()); + return new RedisProducer($this); } /** @@ -113,7 +124,10 @@ public function createConsumer(Destination $destination): Consumer { InvalidDestinationException::assertDestinationInstanceOf($destination, RedisDestination::class); - return new RedisConsumer($this, $destination); + $consumer = new RedisConsumer($this, $destination); + $consumer->setRedeliveryDelay($this->redeliveryDelay); + + return $consumer; } /** @@ -121,7 +135,10 @@ public function createConsumer(Destination $destination): Consumer */ public function createSubscriptionConsumer(): SubscriptionConsumer { - return new RedisSubscriptionConsumer($this); + $consumer = new RedisSubscriptionConsumer($this); + $consumer->setRedeliveryDelay($this->redeliveryDelay); + + return $consumer; } /** diff --git a/RedisMessage.php b/RedisMessage.php index 5b23598..74c6547 100644 --- a/RedisMessage.php +++ b/RedisMessage.php @@ -6,7 +6,7 @@ use Interop\Queue\Message; -class RedisMessage implements Message, \JsonSerializable +class RedisMessage implements Message { /** * @var string @@ -213,31 +213,8 @@ public function getKey(): ?string /** * @param string $key */ - public function setKey(string $key) + public function setKey(string $key): void { $this->key = $key; } - - public function jsonSerialize(): array - { - return [ - 'body' => $this->getBody(), - 'properties' => $this->getProperties(), - 'headers' => $this->getHeaders(), - ]; - } - - public static function jsonUnserialize(string $json): self - { - $data = json_decode($json, true); - if (JSON_ERROR_NONE !== json_last_error()) { - throw new \InvalidArgumentException(sprintf( - 'The malformed json given. Error %s and message %s', - json_last_error(), - json_last_error_msg() - )); - } - - return new self($data['body'], $data['properties'], $data['headers']); - } } diff --git a/RedisProducer.php b/RedisProducer.php index 3b3b138..5272615 100644 --- a/RedisProducer.php +++ b/RedisProducer.php @@ -14,9 +14,9 @@ class RedisProducer implements Producer { /** - * @var Redis + * @var RedisContext */ - private $redis; + private $context; /** * @var int|null @@ -29,11 +29,11 @@ class RedisProducer implements Producer private $deliveryDelay; /** - * @param Redis $redis + * @param RedisContext $context */ - public function __construct(Redis $redis) + public function __construct(RedisContext $context) { - $this->redis = $redis; + $this->context = $context; } /** @@ -60,11 +60,13 @@ public function send(Destination $destination, Message $message): void $message->setHeader('expires_at', time() + $message->getTimeToLive()); } + $payload = $this->context->getSerializer()->toString($message); + if ($message->getDeliveryDelay()) { $deliveryAt = time() + $message->getDeliveryDelay(); - $this->redis->zadd($destination->getName().':delayed', json_encode($message), $deliveryAt); + $this->context->getRedis()->zadd($destination->getName().':delayed', $payload, $deliveryAt); } else { - $this->redis->lpush($destination->getName(), json_encode($message)); + $this->context->getRedis()->lpush($destination->getName(), $payload); } } diff --git a/RedisQueueConsumer.php b/RedisQueueConsumer.php deleted file mode 100644 index 26aa5fd..0000000 --- a/RedisQueueConsumer.php +++ /dev/null @@ -1,148 +0,0 @@ -redis = $redis; - $this->queues = $queues; - } - - /** - * @return int - */ - public function getRetryDelay(): int - { - return $this->retryDelay; - } - - /** - * @param int $retryDelay - */ - public function setRetryDelay(int $retryDelay): void - { - $this->retryDelay = $retryDelay; - } - - public function receiveMessage(int $timeout): ?RedisMessage - { - $startAt = time(); - $thisTimeout = $timeout; - - if (null === $this->queueNames) { - foreach ($this->queues as $queue) { - $this->queueNames[] = $queue->getName(); - } - } - - while ($thisTimeout > 0) { - $this->migrateExpiredMessages($this->queueNames); - - if ($result = $this->redis->brpop($this->queueNames, $thisTimeout)) { - $this->pushQueueNameBack($result->getKey()); - - if ($message = $this->processResult($result)) { - return $message; - } - - $thisTimeout -= time() - $startAt; - } - } - - return null; - } - - public function receiveMessageNoWait(RedisDestination $destination): ?RedisMessage - { - $this->migrateExpiredMessages([$destination->getName()]); - - if ($result = $this->redis->rpop($destination->getName())) { - return $this->processResult($result); - } - - return null; - } - - private function processResult(RedisResult $result): ?RedisMessage - { - $message = RedisMessage::jsonUnserialize($result->getMessage()); - - $now = time(); - - if ($expiresAt = $message->getHeader('expires_at')) { - if ($now > $expiresAt) { - return null; - } - } - - $message->setHeader('attempts', $message->getAttempts() + 1); - $message->setRedelivered($message->getAttempts() > 1); - $message->setReservedKey(json_encode($message)); - $message->setKey($result->getKey()); - - $reservedQueue = $result->getKey().':reserved'; - $retryMessageAt = $now + $this->retryDelay; - - $this->redis->zadd($reservedQueue, $message->getReservedKey(), $retryMessageAt); - - return $message; - } - - private function pushQueueNameBack($queueName): void - { - if (count($this->queueNames) <= 1) { - return; - } - - if (false === $from = array_search($queueName, $this->queueNames, true)) { - throw new \LogicException(sprintf('Queue name was not found: "%s"', $queueName)); - } - - $to = count($this->queueNames) - 1; - - $out = array_splice($this->queueNames, $from, 1); - array_splice($this->queueNames, $to, 0, $out); - } - - private function migrateExpiredMessages(array $queueNames): void - { - $now = time(); - - foreach ($queueNames as $queueName) { - $this->redis->eval(LuaScripts::migrateExpired(), [$queueName.':delayed', $queueName], [$now]); - $this->redis->eval(LuaScripts::migrateExpired(), [$queueName.':reserved', $queueName], [$now]); - } - } -} diff --git a/RedisSubscriptionConsumer.php b/RedisSubscriptionConsumer.php index bf7504e..36e6a17 100644 --- a/RedisSubscriptionConsumer.php +++ b/RedisSubscriptionConsumer.php @@ -9,6 +9,8 @@ class RedisSubscriptionConsumer implements SubscriptionConsumer { + use RedisConsumerHelperTrait; + /** * @var RedisContext */ @@ -24,7 +26,7 @@ class RedisSubscriptionConsumer implements SubscriptionConsumer /** * @var int */ - private $retryDelay; + private $redeliveryDelay = 300; /** * @param RedisContext $context @@ -38,17 +40,17 @@ public function __construct(RedisContext $context) /** * @return int */ - public function getRetryDelay(): ?int + public function getRedeliveryDelay(): ?int { - return $this->retryDelay; + return $this->redeliveryDelay; } /** - * @param int $retryDelay + * @param int $delay */ - public function setRetryDelay(int $retryDelay): void + public function setRedeliveryDelay(int $delay): void { - $this->retryDelay = $retryDelay; + $this->redeliveryDelay = $delay; } public function consume(int $timeout = 0): void @@ -66,14 +68,8 @@ public function consume(int $timeout = 0): void $queues[] = $consumer->getQueue(); } - $queueConsumer = new RedisQueueConsumer($this->context->getRedis(), $queues); - - if ($this->retryDelay) { - $queueConsumer->setRetryDelay($this->retryDelay); - } - while (true) { - if ($message = $queueConsumer->receiveMessage($timeout ?: 5)) { + if ($message = $this->receiveMessage($queues, $timeout ?: 5, $this->redeliveryDelay)) { list($consumer, $callback) = $this->subscribers[$message->getKey()]; if (false === call_user_func($callback, $message, $consumer)) { @@ -134,4 +130,9 @@ public function unsubscribeAll(): void { $this->subscribers = []; } + + private function getContext(): RedisContext + { + return $this->context; + } } diff --git a/Serializer.php b/Serializer.php new file mode 100644 index 0000000..a936a93 --- /dev/null +++ b/Serializer.php @@ -0,0 +1,12 @@ +serializer = $serializer; + } + + /** + * @return Serializer + */ + public function getSerializer() + { + return $this->serializer; + } +} diff --git a/Tests/Functional/CommonUseCasesTrait.php b/Tests/Functional/CommonUseCasesTrait.php index 02f63d6..b80ea97 100644 --- a/Tests/Functional/CommonUseCasesTrait.php +++ b/Tests/Functional/CommonUseCasesTrait.php @@ -61,7 +61,10 @@ public function testProduceAndReceiveOneMessageSentDirectlyToQueue() $this->assertEquals(__METHOD__, $message->getBody()); $this->assertEquals(['FooProperty' => 'FooVal'], $message->getProperties()); - $this->assertEquals(['BarHeader' => 'BarVal'], $message->getHeaders()); + $this->assertCount(3, $message->getHeaders()); + $this->assertSame(1, $message->getHeader('attempts')); + $this->assertSame('BarVal', $message->getHeader('BarHeader')); + $this->assertNotEmpty('BarVal', $message->getHeader('message_id')); } public function testProduceAndReceiveOneMessageSentDirectlyToTopic() @@ -99,7 +102,7 @@ public function testConsumerReceiveMessageWithZeroTimeout() $actualMessage = $consumer->receive(0); $this->assertInstanceOf(RedisMessage::class, $actualMessage); - $consumer->acknowledge($message); + $consumer->acknowledge($actualMessage); $this->assertEquals(__METHOD__, $message->getBody()); } diff --git a/Tests/RedisProducerTest.php b/Tests/RedisProducerTest.php index 9893482..fe45bf5 100644 --- a/Tests/RedisProducerTest.php +++ b/Tests/RedisProducerTest.php @@ -54,7 +54,19 @@ public function testShouldCallLPushOnSend() $redisMock ->expects($this->once()) ->method('lpush') - ->with('aDestination', '{"body":"","properties":[],"headers":[]}') + ->willReturnCallback(function (string $key, string $value) { + $this->assertSame('aDestination', $key); + + $message = json_decode($value, true); + + $this->assertArrayHasKey('body', $message); + $this->assertArrayHasKey('properties', $message); + $this->assertArrayHasKey('headers', $message); + $this->assertNotEmpty($message['headers']['message_id']); + $this->assertSame(0, $message['headers']['attempts']); + + return true; + }) ; $producer = new RedisProducer($redisMock); diff --git a/Tests/Spec/RedisSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php b/Tests/Spec/RedisSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php index 21d514f..16dbd8f 100644 --- a/Tests/Spec/RedisSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php +++ b/Tests/Spec/RedisSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php @@ -35,7 +35,7 @@ protected function createQueue(Context $context, $queueName) { /** @var RedisDestination $queue */ $queue = parent::createQueue($context, $queueName); - $context->getRedis()->del($queueName); + $context->deleteQueue($queue); return $queue; }