Skip to content

mgid/kafka-bundle

Repository files navigation

Symfony Kafka Bundle

Scrutinizer Code Quality Code Coverage Total Downloads Latest Stable Version License

How to use

  • Install package
composer req mgid/kafka-bundle
  • Create consumer. Example (src/Consumer/EmailSendConsumer.php):
<?php

namespace App\Consumer;

use Swift_Mailer;
use Mgid\KafkaBundle\Command\Consumer;

class EmailSendConsumer extends Consumer
{
    public const QUEUE_NAME = 'email_send_queue';

    /**
     * @var Swift_Mailer
     */
    private $mailer;

    /**
     * @required
     *
     * @param Swift_Mailer $mailer
     */
    public function setMailer(Swift_Mailer $mailer)
    {
        $this->mailer = $mailer;
    }

    /**
     * {@inheritdoc}
     */
    protected function onMessage(array $data): void
    {
        $message = (new \Swift_Message($data['subject']))
            ->setFrom($data['sender'])
            ->setTo($data['recipient'])
            ->setBody($data['body']);

        $this->mailer->send($message);
    }
}
  • Produce message. Example (src/Service/EmailService.php):
<?php

namespace App\Service;

use App\Consumer\DemoConsumer;
use Mgid\KafkaBundle\DependencyInjection\Traits\ProducerTrait;

class EmailService
{
    use ProducerTrait;

    /**
     * @param array $data
     */
    public function send(array $data): void
    {
        $this->producer->send(DemoConsumer::QUEUE_NAME, $data);
    }
}
  • Run consumer. Example:
php bin/console app:consumer:email-send

Default configuration

# config/packages/mgid_kafka.yaml
mgid_kafka:
    producers:
        configuration:
            group.id: 'main_group'
            log.connection.close: 'false'
            metadata.broker.list: '%env(KAFKA_BROKERS)%'
            queue.buffering.max.messages: 100000

    consumers:
        configuration:
            group.id: 'main_group'
            auto.offset.reset: 'smallest'
            log.connection.close: 'false'
            metadata.broker.list: '%env(KAFKA_BROKERS)%'

Read more about supported configuration properties: librdkafka configuration.