This Symfony bundle provides connectivity to the Kafka publish-subscribe messaging system based on rdkafka binding to librdkafka
Add the dependency in your composer.json
{
"require": {
"mshauneu/php-rdkafka-bundle"
}
}
Enable the bundle in your application kernel
// app/AppKernel.php
public function registerBundles() {
$bundles = array(
// ...
new Mshauneu\RdKafkaBundle\MshauneuRdKafkaBundle(),
);
}
Simple configuration could look like:
mshauneu_rd_kafka:
producers:
test_producer:
brokers: 127.0.0.1:9092
topic: test_topic
consumers:
test_consumer:
brokers: 127.0.0.1:9092
topic: test_topic
properties:
group_id: "test_group_id"
topic_properties:
offset_store_method: broker
auto_offset_reset: smallest
auto_commit_interval_ms: 100
Configuration properties are documented:
- for producer or consumer in CommunicatorConfiguration.php
- for topic to produce in TopicProducerConfiguration.php
- for topic to consume in TopicConsumerConfiguration.php
From a Symfony controller:
$payload = 'test_message';
$topicProducer = $container->get('mshauneu_rd_kafka')->getProducer("test_producer");
$topicProducer->produceStart();
$topicProducer->produce("message");
$topicProducer->produceStop();
By CLI:
./app/console kafka:producer --producer test_producer test_message
Implement ConsumerInterface
class MessageHandler implements ConsumerInterface {
public function consume($topic, $partition, $offset, $key, $payload) {
echo "Received payload: " . $payload . PHP_EOL;
}
}
Register it:
test_message_handler:
class: MessageHandler
From a Symfony controller:
$topicConsumer = $container->get('mshauneu_rd_kafka')->getConsumer("test_producer");
$topicConsumer->consumeStart(TopicCommunicator::OFFSET_STORED);
$topicConsumer->consume($consumerImpl);
$topicConsumer->consumeStop();
By CLI:
./app/console kafka:consumer --consumer test_consumer --handler test_message_handler
This project is under the MIT License. See the LICENSE file for the full license text.