This package provides 2 layers for abstraction of message broker.
- A connection layer
- A destination layer
Message Broker | Library | Driver name |
---|---|---|
Beanstalk | Pheanstalk | pheanstalk |
Db | Doctrine | doctrine+(*) |
Enqueue | php-enqueue | enqueue+(*) |
Gearman | Pecl Gearman | gearman |
Kafka | RdKafka | rdkafka |
Memory | memory | |
Null | null | |
RabbitMQ | Amqp lib | amqp-lib |
Redis (Ext) | PhpRedis | redis+phpredis |
Redis | PRedis | redis+predis |
First, create a new destination manager instance.
<?php
use Bdf\Queue\Connection\Factory\ResolverConnectionDriverFactory;
use Bdf\Queue\Connection\Pheanstalk\PheanstalkConnection;
use Bdf\Queue\Destination\ConfigurationDestinationFactory;
use Bdf\Queue\Destination\DestinationManager;
use Bdf\Queue\Destination\DestinationFactory;
use Bdf\Queue\Serializer\JsonSerializer;
// Declare connections
$driverFactory = new ResolverConnectionDriverFactory([
'foo' => [
'driver' => 'pheanstalk',
'host' => 'localhost',
'port' => '11300',
'additionalOption' => 'value',
]
// OR use DSN 'foo' => 'pheanstalk://localhost:11300?additionalOption=value'
]);
// Declare drivers
$driverFactory->addDriverResolver('pheanstalk', function($config) {
//echo $config['connection'] displays "foo"
return new PheanstalkConnection($config['connection'], new JsonSerializer());
});
// Declare destination
// You can also declare your custom destination that defined type of transport (queue, multi queues, topic, ...),
// the connection to use, and the name of the queue(s) / topic(s) to use.
// This example will use the queue driver of the "foo" connection defined above. And send / consume message on the queue named "default".
$destinationFactory = new DestinationFactory(
$driverFactory,
['my_destination' => 'queue://foo/default']
);
// To send a message to multiple destinations, you can use "aggregate" destination type.
// You can use a wildcard to send to all destinations that match the pattern.
// In this example, 'user' destination will be sent to the "foo" and "bar" queues, and to all topics that match the pattern "*.user"
$destinationFactory = new DestinationFactory(
$driverFactory,
[
'foo' => 'queue://test/foo',
'bar' => 'queue://test/bar',
'a.user' => 'topic://a/user',
'b.user' => 'topic://b/user',
'user' => 'aggregate://foo,bar,*.user',
]
);
// Create the manager
$manager = new DestinationManager($driverFactory, $destinationFactory);
Push a basic message into the queue. The consume should defined handler to process the message.
<?php
use Bdf\Queue\Message\Message;
$message = Message::create('Hello world');
$message->setDestination('my_destination');
// or use a lower level setting the connection and queue.
$message = Message::create('Hello world', 'queue');
$message->setConnection('foo');
/** @var Bdf\Queue\Destination\DestinationManager $manager */
$manager->send($message);
Useful for monolithic application that needs to differ a process. Push a message job into the queue. The consumer will evaluate the job string and run the processor. In this use case the producer and the receiver share the same model.
<?php
$message = \Bdf\Queue\Message\Message::createFromJob(Mailer::class.'@send', ['body' => 'my content']);
$message->setDestination('my_destination');
/** @var Bdf\Queue\Destination\DestinationManager $manager */
$manager->send($message);
The class Bdf\Queue\Destination\DsnDestinationFactory
provides default type of destination:
Name | Exemple | Definition |
---|---|---|
queue | queue://connection_name/queue_name | Publish and consume a single queue |
queues | queues://connection_name/queue1,queue2 | Only consume multi queues |
topic | topic://connection_name/topic | Publish and consume a topic. Pattern with wildcard are allowed for consumer use case only (ex: topic.*) |
topics | topics://connection_name/topic1,topic2 | Only consume multi topics |
You can declare your own type:
<?php
use Bdf\Dsn\DsnRequest;
use Bdf\Queue\Connection\ConnectionDriverInterface;
use Bdf\Queue\Connection\Factory\ResolverConnectionDriverFactory;
/** @var ResolverConnectionDriverFactory $driverFactory */
$destinationFactory = new Bdf\Queue\Destination\DsnDestinationFactory($driverFactory);
$destinationFactory->register('my_own_type', function(ConnectionDriverInterface $connection, DsnRequest $dsn) {
// ...
});
// use dsn as "my_own_type://connection/queue_or_topic_name?option="
The consumer layer provides many tools for message handling. The default stack of objects that will receive the message is:
consumer (ConsumerInterface) -> receivers (ReceiverInterface) -> processor (ProcessorInterface) -> handler (callable)
consumer
has the strategy for reading the message from queue / topic. It also manage a graceful shutdown.receivers
is the stack of middlewares interacts with the envelope.processor
resolves the handler arguments. You can plug here your business logic and remove the handler layer. By default processor injects 2 arguments in handlers: the message data and the envelope.handler
manages the business logic. Handler allows an interface less mode.
An example to consume a simple message:
<?php
use Bdf\Queue\Consumer\Receiver\ProcessorReceiver;
use Bdf\Queue\Destination\DestinationManager;
use Bdf\Queue\Processor\CallbackProcessor;
use Bdf\Queue\Processor\MapProcessorResolver;
// Create your processor and declare in a map:
$myProcessor = new CallbackProcessor(function($data) {
echo $data;
});
$processorResolver = new MapProcessorResolver(['foo' => $myProcessor]);
/** @var DestinationManager $manager */
$manager->create('queue://foo')->consumer(new ProcessorReceiver($processorResolver))->consume(0);
Consume a job message:
<?php
use Bdf\Instantiator\Instantiator;
use Bdf\Queue\Consumer\Receiver\ProcessorReceiver;
use Bdf\Queue\Destination\DestinationManager;
use Bdf\Queue\Processor\JobHintProcessorResolver;
/** @var Instantiator $instantiator */
// The job should be provided from message to get the processor
$processorResolver = new JobHintProcessorResolver($instantiator);
/** @var DestinationManager $manager */
$manager->create('queue://foo')->consumer(new ProcessorReceiver($processorResolver))->consume(0);
<?php
/** @var Bdf\Queue\Destination\DestinationManager $manager */
class MyHandler
{
public function handle($data, \Bdf\Queue\Message\EnvelopeInterface $envelope)
{
echo $data; // Display 'foo'
// Ack the message. Default behavior. The ack is sent before the call by the consumer.
$envelope->acknowledge();
// Reject the message. It will be no more available. The message is rejected if and exception is thrown.
$envelope->reject();
// Reject the message and send it back to the queue
$envelope->reject(true);
}
}
$message = \Bdf\Queue\Message\Message::createFromJob(MyHandler::class, 'foo', 'queue');
$manager->send($message);
Use the synthax "Class@method"
to determine the callable (By default the method is "handle")
or register your handlers on a specific destination with the receiver builder:
<?php
use Bdf\Queue\Consumer\Receiver\Builder\ReceiverBuilder;
use Bdf\Queue\Consumer\Receiver\Builder\ReceiverLoader;
use Bdf\Queue\Consumer\Receiver\Builder\ReceiverLoaderInterface;
use Psr\Container\ContainerInterface;
/** @var ContainerInterface $container */
/** @var Bdf\Queue\Destination\DestinationManager $manager */
$container->set(ReceiverLoaderInterface::class, function (ContainerInterface $container) {
return new ReceiverLoader(
$container,
[
'destination_name or connection_name' => function(ReceiverBuilder $builder) {
/** @var \Bdf\Queue\Processor\ProcessorInterface $myProcessor */
/** @var \Bdf\Queue\Consumer\ReceiverInterface $myReceiver */
// Register your unique handler for the destination or connection.
// all message will be handled by this handler.
$builder->handler(MyHandler::class);
// Or register your unique processor
$builder->processor($myProcessor);
// Or register the job bearer resolver as processor. The procesor will resolve the job
// from the Message::$job attribute value.
$builder->jobProcessor();
// Or register your own processor or handler by queue in case you consume a connection.
// By default the key of the map is the queue name. You can provide your own key provider
// with the second parameter.
$builder->mapProcessor([
'queue1' => $myProcessor,
'queue2' => MyHandler::class,
]);
// Or register your final own receiver
$builder->outlet($myReceiver);
// Or register your own receiver in the stack
$builder->add($myReceiver);
// You can add more defined middlewares here
// $builder->retry(2);
}
]
);
});
$receiver = $container->get(ReceiverLoaderInterface::class)->load('destination_name or connection_name')->build();
$manager->create('queue://foo')->consumer($receiver)->consume(0);
$ example/consumer.php "connection name OR destination name"
The consumer use a stack of receivers to extend the reception of messages.
See the interface Bdf\Queue\Consumer\ReceiverInterface
and the trait Bdf\Queue\Consumer\DelegateHelper
.
<?php
class MyExtension implements \Bdf\Queue\Consumer\ReceiverInterface
{
use \Bdf\Queue\Consumer\DelegateHelper;
private $options;
/**
* MyExtension constructor.
*/
public function __construct(\Bdf\Queue\Consumer\ReceiverInterface $delegate, array $options)
{
$this->delegate = $delegate;
$this->options = $options;
}
/**
* {@inheritdoc}
*/
public function receive($message, \Bdf\Queue\Consumer\ConsumerInterface $consumer): void
{
// Do something when receiving message
if ($message->queue() === 'foo') {
return;
}
// Call the next receiver
$this->delegate->receive($message, $consumer);
}
}
You can use the Bdf\Queue\Consumer\Receiver\Builder\ReceiverLoader::add()
to register your receiver in the stack
<?php
$options = ['foo' => 'bar'];
/** @var \Bdf\Queue\Consumer\Receiver\Builder\ReceiverBuilder $builder */
$builder->add(MyExtension::class, [$options]);
The class Bdf\Queue\Serializer\SerializerInterface
manage the payload content sent to the message broker.
By default metadata are added to the json as:
- PHP Type: to help consumer to deserialize complex entities.
- Message info: The attempt number for retry, The sending date, ...
A basic payload looks like:
{
"name": "Foo",
"data": "Hello World",
"date": "2019-12-23T16:02:03+01:00"
}
You can customize the string with your own implementation of the serializer interface.
Try the hello world example (configure the message broker in example/config/connections.php
):
$ example/producer.php foo '{"name":"Foo", "data":"Hello World"}' --raw
$ example/consumer.php foo
<?php
use Bdf\Queue\Message\InteractEnvelopeInterface;
use Bdf\Queue\Message\Message;
class RpcReplyHandler
{
public function doSomethingUseful(int $number, InteractEnvelopeInterface $envelope)
{
// Send bask: 1 x 2 to client
$envelope->reply($number * 2);
// Or retry in 10sec
$envelope->retry(10);
}
}
$message = Message::createFromJob(RpcReplyHandler::class.'@doSomethingUseful', 1, 'queue');
$message->setConnection('foo');
/** @var Bdf\Queue\Destination\DestinationManager $manager */
$promise = $manager->send($message);
// Consume the foo connection
// Receive data from the reply queue. If the header "replyTo" is not set,
// the response will be sent to "queue_reply"
echo $promise->await(500)->data(); // Display 2
Option | Type | Supports | Description |
---|---|---|---|
driver |
string | all | The name of the driver to use. See driver name in support section. |
vendor |
string | all | Second part of the protocol. Vendor is used by some driver that use internal drivers. |
queue |
string | all | The default queue of the connection used only if no queue has been set on the message. Destination should provide the queue. |
host |
string | all | The host / ip to connect to message broker. Usually set to localhost . |
port |
int | all | The port of the message broker. Usually set to the default port. |
user |
string | all | |
password |
string | all | |
prefetch |
int | all | Load a number of message in memory. Faster for some broker that supports reservation |
serializer |
string | all | Load a serializer for this connection. Used only by driver that needs serializer. |
vhost |
string | amqp-lib | Default / . |
group |
string | amqp-lib | Group use by topic to allows set of consumers on the same topic. Default bdf . |
sleep_duration |
int | amqp-lib | The internal sleep in milliseconds between two pop. Default 200 . |
queue_flags |
int | amqp-lib | The flag for queue declaration. See AmqpDriver constants. Default 2 (FLAG_QUEUE_DURABLE value). |
topic_flags |
int | amqp-lib | The flag for topic declaration. See AmqpDriver constants. Default 0 (FLAG_NOPARAM value). |
consumer_flags |
int | amqp-lib | The flag for consumer. See AmqpDriver constants. Default 0 (FLAG_NOPARAM value). |
auto_declare |
bool | amqp-lib, redis, enqueue | Auto declare the queue when pushing or poping. Use queue setup command otherwise. Default false . |
qos_prefetch_size |
int | amqp-lib | Prefetch optimisation. Default 0 . |
qos_prefetch_count |
int | amqp-lib | Prefetch optimisation. Default 1 . |
qos_global |
int | amqp-lib | Prefetch optimisation. Default false . |
table |
string | doctrine | The table name to use to store message. Default value doctrine_queue |
ttr |
int | pheanstalk | Time to run in seconds. Can also be defined in message header. Default 60 . |
client-timeout |
int | pheanstalk, gearman | Timeout of client in milliseconds. Disable by default. |
commitAsync |
bool | rdkafka | Enable asynchrone ack. Default false . |
offset |
int | rdkafka | Position to start consumer. Default null . |
partition |
int | rdkafka | Partition to for the consumer, see kafka constant. Default -1 (RD_KAFKA_PARTITION_UA value). |
global |
array | rdkafka | Kafka config for global settings. |
producer |
array | rdkafka | Kafka config for producer. |
consumer |
array | rdkafka | Kafka config for the consume |
poll_timeout |
int | rdkafka | The timeout for the poll method in milliseconds. |
flush_timeout |
int | rdkafka | The timeout for the flush method in milliseconds. |
dr_msg_cb |
callable | rdkafka | Delivery report callback. |
error_cb |
callable | rdkafka | Error callback. |
rebalance_cb |
callable | rdkafka | Called after consumer group has been rebalanced. |
stats_cb |
callable | rdkafka | Statistics callback. |
partitioner |
string | rdkafka | Kafka partitioner for topic settings. |
group |
string | rdkafka | Group use by topic to allows set of consumers on the same topic. Default "2" . |
timeout |
int | redis | The connection timeout in seconds. Default 0 . |
prefix |
string | redis | The key prefix. Default queues: . |
Note:
- Format of a valid DSN: {driver}+{vendor}://{user}:{password}@{host}:{port}/{queue}?{option}=value
- See https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md for more kafka options.
Option | Type | Supports | Description |
---|---|---|---|
flags |
int | amqp-lib | The flags for message. See driver constants. |
priority |
int | pheanstalk | Priority message. Default 1024 . |
ttr |
int | pheanstalk | Time to run in seconds. Default 60 . |
key |
string | rdkafka | |
partition |
int | rdkafka | The number of the partition. |
simple job / closure job
Serializer | Serializer | +Compress | Bdf JSON | +Compress | Bdf binary |
---|---|---|---|---|---|
Size | 141 / 377 | 105 / 244 | 109 / 407 | 76 / 247 | 98 / 355 |
Serialize time | 0.0014 / 6.8 | 0.016 / 7 | 0.011 / 7 | 0.026 / 7 | 0.011 / 7 |
Unserialize time | 0.007 / 0.0025 | 0.0082 / 0.0068 | 0.024 / 0.015 | 0.024 / 0.019 | 0.019 / 0.011 |
- For the best execution time, regardless of size, use the default
Serializer
- For the smaller size, regardless of time, use
BdfSerializer
withCompressedSerializer
- For the best compromise, use
Serializer
withCompressedSerializer
- Always smaller than pure
BdfSerializer
(JSON or Binary) - Faster on unserialize, slightly slower on serialize
- Around twice faster than compressed bdf, but only ~40% larger on simple job
- Always smaller than pure
Distributed under the terms of the MIT license.