composer require beta/bx.kafka.agent
Далее устанавливаем модуль (bx.kafka.agent) через админку - /bitrix/admin/partner_modules.php?lang=ru
После установки в корне проекта появится 2 файла:
- kfagent - скрипт для запуска проекта
- kfagent.service - сервис для подсистемы systemd
Далее переносим сервис для запуска:
mv kfagent.service /etc/systemd/system/
Перезапускаем конфигурацию systemd:
sudo systemctl daemon-reload
Активируем сервис:
sudo systemctl enable kfagent
И запускаем его:
sudo systemctl start kfagent
use Bx\Kafka\Agent\Manager;
// через init.php
Manager::getInstance()->addEventHandler(
'employee', // название топика
'my.module',
'SomeNamespace\\MyClass',
'someStaticMethod'
);
// через миграцию
Manager::getInstance()->registerEventHandler(
'employee', // название топика
'my.module',
'SomeNamespace\\MyClass',
'someStaticMethod'
);
use Bx\Kafka\Agent\Manager;
use Bx\Kafka\Agent\NewMessageSubject;
class MyNewEmployeeObserver implements SplObserver
{
public function update(SplSubject $subject): void
{
if (!($subject instanceof NewMessageSubject)) {
return;
}
$subject->getMessage()->getData(); // получаем данные из брокера
}
}
Manager::getInstance()->addObserver(
'employee', // название топика
new MyNewEmployeeObserver() // экземпляр наблюдателя
);