-
-
Notifications
You must be signed in to change notification settings - Fork 19
/
index.js
50 lines (40 loc) · 1.36 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
'use strict'
const fp = require('fastify-plugin')
const Producer = require('./lib/producer')
const Consumer = require('./lib/consumer')
function fastifyKafka (fastify, opts, next) {
fastify.decorate('kafka', {})
if (opts.producer) {
fastify.register(fp(buildProducer), opts)
}
if (opts.consumer) {
fastify.register(fp(buildConsumer), opts)
}
next()
}
function buildProducer (fastify, opts, next) {
const producer = new Producer(opts.producer, fastify.log, next, opts.producerTopicConf, opts.metadataOptions)
fastify.kafka.producer = producer
fastify.kafka.push = producer.push.bind(producer)
fastify.addHook('onClose', onClose)
function onClose (fastify, done) {
fastify.kafka.producer.stop(done)
}
}
function buildConsumer (fastify, opts, next) {
const consumer = new Consumer(opts.consumer, fastify.log, next, opts.consumerTopicConf, opts.metadataOptions)
fastify.kafka.consumer = consumer
fastify.kafka.consume = consumer.consume.bind(consumer)
fastify.kafka.subscribe = consumer.subscribe.bind(consumer)
fastify.kafka.on = consumer.on.bind(consumer)
fastify.addHook('onClose', onClose)
function onClose (fastify, done) {
fastify.kafka.consumer.stop(done)
}
}
module.exports = fp(fastifyKafka, {
fastify: '5.x',
name: '@fastify/kafka'
})
module.exports.default = fastifyKafka
module.exports.fastifyKafka = fastifyKafka