Skip to content

Commit

Permalink
Add Producer events (#728)
Browse files Browse the repository at this point in the history
* Add events for before and after producer publishes a message

* Add documentation for producer events

* Fix documented return type
  • Loading branch information
nathanjrobertson authored Oct 17, 2024
1 parent 8c18f6d commit 242f2ed
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 1 deletion.
29 changes: 28 additions & 1 deletion Event/AMQPEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
namespace OldSound\RabbitMqBundle\Event;

use OldSound\RabbitMqBundle\RabbitMq\Consumer;
use OldSound\RabbitMqBundle\RabbitMq\Producer;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\EventDispatcher\Event;

/**
* Class AMQPEvent
Expand All @@ -18,6 +18,8 @@ class AMQPEvent extends AbstractAMQPEvent
public const ON_IDLE = 'on_idle';
public const BEFORE_PROCESSING_MESSAGE = 'before_processing';
public const AFTER_PROCESSING_MESSAGE = 'after_processing';
public const BEFORE_PUBLISH_MESSAGE = 'before_publishing';
public const AFTER_PUBLISH_MESSAGE = 'after_publishing';

/**
* @var AMQPMessage
Expand All @@ -29,6 +31,11 @@ class AMQPEvent extends AbstractAMQPEvent
*/
protected $consumer;

/**
* @var Producer
*/
protected $producer;

/**
* @return AMQPMessage
*/
Expand Down Expand Up @@ -68,4 +75,24 @@ public function setConsumer(Consumer $consumer)

return $this;
}

/**
* @return Producer
*/
public function getProducer()
{
return $this->producer;
}

/**
* @param Producer $producer
*
* @return AMQPEvent
*/
public function setProducer(Producer $producer)
{
$this->producer = $producer;

return $this;
}
}
41 changes: 41 additions & 0 deletions Event/AfterProducerPublishMessageEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

namespace OldSound\RabbitMqBundle\Event;

use OldSound\RabbitMqBundle\RabbitMq\Producer;
use PhpAmqpLib\Message\AMQPMessage;

/**
* Class AfterProducerPublishMessageEvent
*
* @package OldSound\RabbitMqBundle\Command
*/
class AfterProducerPublishMessageEvent extends AMQPEvent
{
public const NAME = AMQPEvent::AFTER_PROCESSING_MESSAGE;

/**
* @var string
*/
protected $routingKey;

/**
* AfterProducerPublishMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct(Producer $producer, AMQPMessage $AMQPMessage, string $routingKey)
{
$this->setProducer($producer);
$this->setAMQPMessage($AMQPMessage);
$this->routingKey = $routingKey;
}

/**
* @return string
*/
public function getRoutingKey()
{
return $this->routingKey;
}
}
41 changes: 41 additions & 0 deletions Event/BeforeProducerPublishMessageEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

namespace OldSound\RabbitMqBundle\Event;

use OldSound\RabbitMqBundle\RabbitMq\Producer;
use PhpAmqpLib\Message\AMQPMessage;

/**
* Class BeforeProducerPublishMessageEvent
*
* @package OldSound\RabbitMqBundle\Command
*/
class BeforeProducerPublishMessageEvent extends AMQPEvent
{
public const NAME = AMQPEvent::BEFORE_PROCESSING_MESSAGE;

/**
* @var string
*/
protected $routingKey;

/**
* BeforeProducerPublishMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct(Producer $producer, AMQPMessage $AMQPMessage, string $routingKey)
{
$this->setProducer($producer);
$this->setAMQPMessage($AMQPMessage);
$this->routingKey = $routingKey;
}

/**
* @return string
*/
public function getRoutingKey()
{
return $this->routingKey;
}
}
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,49 @@ If you need to use a custom class for a producer (which should inherit from `Old

The next piece of the puzzle is to have a consumer that will take the message out of the queue and process it accordingly.

#### Producer Events ####

There are currently two events emitted by the producer.

##### BeforeProducerPublishMessageEvent #####
This event occurs immediately before publishing the message. This is a good hook to do any final logging, validation, etc. before actually sending the message. A sample implementation of a listener:

```php
namespace App\EventListener;

use OldSound\RabbitMqBundle\Event\BeforeProducerPublishMessageEvent;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;

#[AsEventListener(event: BeforeProducerPublishMessageEvent::NAME)]
final class AMQPBeforePublishEventListener
{
public function __invoke(BeforeProducerPublishMessageEvent $event): void
{
// Your code goes here
}
}
```

##### AfterProducerPublishMessageEvent #####
This event occurs immediately after publishing the message. This is a good hook to do any confirmation logging, commits, etc. after actually sending the message. A sample implementation of a listener:

```php
namespace App\EventListener;

use OldSound\RabbitMqBundle\Event\AfterProducerPublishMessageEvent;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;

#[AsEventListener(event: AfterProducerPublishMessageEvent::NAME)]
final class AMQPBeforePublishEventListener
{
public function __invoke(AfterProducerPublishMessageEvent $event): void
{
// Your code goes here
}
}
```


### Consumers ###

A consumer will connect to the server and start a __loop__ waiting for incoming messages to process. Depending on the specified __callback__ for such consumer will be the behavior it will have. Let's review the consumer configuration from above:
Expand Down
13 changes: 13 additions & 0 deletions RabbitMq/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace OldSound\RabbitMqBundle\RabbitMq;

use OldSound\RabbitMqBundle\Event\AfterProducerPublishMessageEvent;
use OldSound\RabbitMqBundle\Event\BeforeProducerPublishMessageEvent;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

Expand Down Expand Up @@ -63,6 +65,12 @@ public function publish($msgBody, $routingKey = null, $additionalProperties = []
}

$real_routingKey = $routingKey !== null ? $routingKey : $this->defaultRoutingKey;

$this->dispatchEvent(
BeforeProducerPublishMessageEvent::NAME,
new BeforeProducerPublishMessageEvent($this, $msg, $real_routingKey)
);

$this->getChannel()->basic_publish($msg, $this->exchangeOptions['name'], (string)$real_routingKey);
$this->logger->debug('AMQP message published', [
'amqp' => [
Expand All @@ -72,5 +80,10 @@ public function publish($msgBody, $routingKey = null, $additionalProperties = []
'headers' => $headers,
],
]);

$this->dispatchEvent(
AfterProducerPublishMessageEvent::NAME,
new AfterProducerPublishMessageEvent($this, $msg, $real_routingKey)
);
}
}

0 comments on commit 242f2ed

Please sign in to comment.