rabbitmq-adonis-v6
is a RabbitMQ provider for Adonis.
Install rabbitmq-adonis-v6
:
npm i rabbitmq-adonis-v6
Then:
node ace configure rabbitmq-adonis-v6
This is will create start/rabbit.ts
.
This will create config/rabbit.ts
and add the following fields to your .env
:
RABBITMQ_HOSTNAME=
RABBITMQ_USER=
RABBOTMQ_PASSWORD=
RABBITMQ_PORT=
RABBITMQ_PROTOCOL= "amqp" //or ampqs
Make sure to set the correct values to the enviroment variables so rabbitmq-adonis-v6
can connect.
import { Rabbit } from 'rabbitmq-adonis-v6'
import router from '@adonisjs/core/services/router'
Route.get('/', async () => {
// Ensures the queue exists
await Rabbit.assertQueue('my_queue')
// Sends a message to the queue
await Rabbit.sendToQueue('my_queue', 'This message was sent by adonis-rabbit')
})
Notice doesn't really makes sense to subscribe to an queue inside a controller, usually this is done through a preload file.
Inside start/rabbit.ts
:
import { Rabbit } from 'rabbitmq-adonis-v6'
async function listen() {
await Rabbit.assertQueue('my_queue')
await Rabbit.consumeFrom('my_queue', (message) => {
console.log(message.content)
})
}
listen()
This will log every message sent to my queue my_queue
.
import { Rabbit } from 'rabbitmq-adonis-v6'
await Rabbit.assertQueue('myQueue')
Assert the queue is created.
Parameters:
queueName
: the name of the queueoptions?
: the queue options
await Rabbit.assertExchange('myQueue', 'type')
Assert the exchange is created.
Parameters:
queueName
: the name of the queuetype
: the type of the exchangeoptions?
: the queue options
await Rabbit.bindQueue('myQueue', 'myExchange', '')
Binds a queue and an exchange .
queueName
: the name of the queueexchangeName
: the name of the exchangepattern?
: the pattern (default to''
)
await Rabbit.sendToQueue('myQueue', 'content')
Parameters:
queueName
: the name of the queuecontent
: the content to be send to the queueoptions
: the options
Notice that the content
parameter don't need to be a Buffer, Adonis RabbitMQ will automatically convert it to a Buffer if it isn't already.
You also don't have to JSON.stringify
an object, Adonis RabbitMQ will also do that for you (it'll be transformed to JSON then to Buffer).
await Rabbit.sendToExchange('myExchange', 'myRoutingKey', 'content')
Parameters:
exchangeName
: the name of the exchangeroutingKey
: the routing keycontent
: the content to send to the exchangeoptions
: the options
Notice that the content
parameter doesn't need to be a Buffer, Adonis RabbitMQ will automatically convert it to a Buffer if it is'nt already.
You also don't have to JSON.stringify
an object, Adonis RabbitMQ will also do that for you (it'll be transformed to JSON then to Buffer).
await Rabbit.consumeFrom('myQueue', (message) => {
console.log(message.content)
message.ack()
})
Consumes a message from a queue.
queueName
: the name of the queueonMessage
the callback which will be executed on the message receive.
The onMessage
callback receives a Message
instance as parameter.
await Rabbit.ackAll()
Acknowledges all the messages.
await Rabbit.nackAll()
Rejects all the messages.
Parameters:
requeue?
adds the rejected messages to queue again.
Retrieves the amqplib's Connection instance. If there`s not a connection, it'll be created.
await Rabbit.getConnection()
Retrieves the amqplib's Connection instance. If there`s not a connection, it'll be created.
await Rabbit.getConnection()
Retrieves the amqplib's Channel instance. If there's not a connection, it'll be created. If there`s not a channel, it'll be created too.
await Rabbit.getChannel()
Indicate if has an active connection
await Rabbit.validateConnection()
Closes the channel.
Closes the connection.
When consuming messages through consumeFrom
, you'll receive in the callback a Message instance.
This slightly different from amqplib approach. For example:
Rabbit.consumeFrom('queue', (message) => {
// Acknowledges the message
message.ack()
// Rejects the message
message.reject()
// The message content
console.log(message.content)
// If you're expecting a JSON, this will return the parsed message
console.log(message.jsonContent)
})
message.content
Returns the message content.
message.jsonContent
If the message is expected to be in JSON format, then you can use message.jsonContent
to get the message parsed as an object.
message.fields
The message fields.
message.properties
The message properties.
message.ack()
Acknowledges the message.
allUpTo?
acknowledges all the messages up to this.
message.nack()
Rejects the message.
Parameters:
allUpTo?
rejects all the messages up to this.requeue?
adds the rejected messages to Queue again.
message.nack()
Rejects the message, equivalent to nack
, but works in older versions of RabbitMQ where nack
does not.
Parameters:
requeue?
adds the rejected messages to Queue again.
In your .env you have to put the service container name, for example:
RABBITMQ_HOSTNAME={your_docker_container_service_name}
Part of the code for this package has been taken and adapted from the RabbitMQ package for Adonis v5. You can find the original package here.