Ko-worker project is a PHP library that aims to abstract queue and RPC messaging patterns that can be implemented over RabbitMQ. The other goal is simplify message dispatching and RPC server development over RabbitMQ.
$m = new Ko\AmqpBroker($config);
$m->getProducer('upload_picture')->publish($message);
Each time you starting to use something like queues you should decide how to process your messages. Yes, it's sounds not complicated:
- write some console application or daemon
- grab message from queue and execute them in master or child process
- restart child process if it fail
- log crashes or so on
- ...
And Ko-Worker allows to simplify this to just 2 steps
- create config file
- run ko application
$ ./bin/ko --workers 5 --consumer upload_picture
Why Ko-worker?
- Used in real life highload GameNet project
- Close to 100% code coverage
- Simple - less then 5 minuted before you start.
PHP >= 5.4
pcntl extension installed
posix extension installed
amqp extension installed
The recommended way to install library is composer. You can see package information on Packagist.
{
"require": {
"misterion/ko-worker": "*"
}
}
Just clone the repository and take care about autoload for namespace Ko
.
Look at examples
folder
UNDONE
First of all your should create your RabbitMq configuration file. For example config.yaml:
connections:
default:
host: 'localhost'
port: 5672
login: 'guest'
password: 'guest'
vhost: '/'
producers:
social_activity:
connection: default
exchange_options: {name: 'social_activity_exchange', type: direct, durable: 1, passive: 1}
consumers:
social_activity:
connection: default
queue_options:
name: 'social_activity_queue'
durable: 1
autodelete: 1
exclusive: 0
qos: 5
binding: {name: social_activity_exchange, routing-keys: *}
class: \MyProject\TestAction
Here we configure the exchange producer and queue consumer that our application will have. In this example Ko\AmqpBroker will contain the producer social_activity
and consumer named social_activity
.
You should specify a connection for the client.
If you need to add optional queue or exchange arguments, then your options can be something like this:
queue_options: {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}
another example with message TTL of 20 seconds:
queue_options: {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}
The argument value must be a list of datatype and value. Valid datatypes are:
S
- StringI
- IntegerD
- DecimalT
- TimestampsF
- TableA
- Array
Adapt the arguments
according to your needs.
If you want to bind queue with specific routing keys you can declare it in producer or consumer config:
queue_options:
binding:
- {name: "social_activity_exchange", routing_keys: 'social.#.addFriends'}
- {name: "social_activity_exchange", routing_keys: '*.removeFriends'}
In a messaging application, the process sending messages to the broker is called producer while the process receiving those messages is called consumer. In your application you will have several of them that you can list under their respective entries in the configuration.
A producer will be used to send messages to the server. In the AMQP Model, messages are sent to an exchange, this means that in the configuration for a producer you will have to specify the connection options along with the exchange options, which usually will be the name of the exchange and the type of it.
Now let's say that you want to process some social network activity in the background. After you add friend, you will publish a message to server with the following information:
public function addFriendAction($name)
{
$msg = array('user_id' => 1235, 'friend_id' => '67890');
$this->broker->getProducer('social_activity')->publish(json_encode($msg));
}
Besides the message itself, the Ko\RabbitMq\Producer#publish()
method also accepts an optional routing key parameter and an optional array of additional properties. This way, for example, you can change the application headers.
The next piece of the puzzle is to have a consumer that will take the message out of the queue and process it accordingly.
A consumer will connect to the server and start a loop waiting for incoming messages to process. Depending on the specified class for such consumer will be the behavior it will have. Let's review the consumer configuration from above:
consumers:
social_activity:
connection: default
queue_options:
name: 'social_activity_queue'
durable: 1
autodelete: 1
exclusive: 0
qos: 5
binding: {name: social_activity_exchange, routing-keys: *}
class: \MyProject\TestAction
As we see there, the class option has a reference to an \MyProject\TestAction class. It should implements Ko\Worker\ActionInterface.
When the consumer gets a message from the server it will create and execute such class. If for testing or debugging purposes you need to specify a different class, then you can change it there.
Apart from the callback we also specify the connection to use, the same way as we do with a producer. The remaining options are the the queue_options. In the queue_options we will provide a queue name and binding.
Why?
As we said, messages in AMQP are published to an exchange. This doesn't mean the message has reached a queue. For this to happen, first we need to create such queue and then bind it to the exchange.
The cool thing about this is that you can bind several queues to one exchange, in that way one message can arrive to several destinations.
The advantage of this approach is the decoupling from the producer and the consumer. The producer does not care about how many consumers will process his messages.
All it needs is that his message arrives to the server. In this way we can expand the actions we perform every time a friend added without the need to change code in our controller.
Now, how to run a consumer? There's a command for it that can be executed like this:
$ ./bin/ko --c ../config_queue.yaml --q social_activity --w 10
What does this mean?
We are executing the social_activity consumer telling it should use 10 child process to consuming.
Every time the consumer receives a message from the server, it will execute the configured callback passing the AMQP message as an instance of the AMQPEnvelope
class. The message body can be obtained by calling $msg->getBody()
.
By default the consumer will process messages in an endless loop for some definition of endless.
UNDONE
Ko-worker came with nice ko-package
utility which allow you to create executable phar from your worker project.
$ ./bin/ko-package
ko-package version 0.0.1
Usage: ko-package [options] [operands]
Options:
-p, --path <arg> Path to application code
-o, --output [<arg>] Output file name
-v, --version [<arg>] Application version
-n, --name [<arg>] Application name
-e, --exclude [<arg>] Excluded folders like logs, test, etc.
Ko-worker written as a part of GameNet project by Nikolay Bondarenko (misterionkell at gmail.com) and Vadim Sabirov (pr0head at gmail.com). We use some interesting ideas from Alvaro Videla Thumper and part of documentation from RabbitMqBundle.
Released under the MIT license.