This is a simple guide to RabbitMQ patterns in MacOS using Node.js based on RabbitMQ Tutorials. The steps on this guide may also be applied to other operating systems but be aware that installation and running of RabbitMQ binaries and services could be different. In a nutshell, this guide covers installation, execution and basic configuration of the RabbitMQ service in Node.js.
- Create
new_task.js
- Create
worker.js
- Running
worker.js
- Running
new_task.js
- Tasks running in sequence
- Message acknowledgements
- Message durability
- Check unacknowledged messages
- Message persisence
- Fair dispatch
- Summary
- Create
emit_log.js
- Create
receive_logs.js
- Running
receive_logs.js
- Running
emit_log.js
- Messages published to subscribers
- Check bindings
- Check types of exchanges
- Summary
- Create
emit_log_direct.js
- Create
receive_logs_direct.js
- Running
receive_logs_direct.js
- Running
emit_log_direct.js
- Summary
-
brew install rabbitmq
- As of writing, I expect that you will also have this issue:
Error: The `brew link` step did not complete successfully
The formula built, but is not symlinked into /usr/local
Could not symlink sbin/cuttlefish
/usr/local/sbin is not writable.
cd /usr/local
sudo mkdir sbin
sudo chown -R glenn:admin sbin
brew link rabbitmq
brew services start rabbitmq
This will send messages to a queue.
#!/usr/bin/env node
const amqp = require('amqplib/callback_api')
// Create connection
amqp.connect('amqp://localhost', (err, conn) => {
// Create channel
conn.createChannel((err, ch) => {
// Name of the queue
const q = 'hello'
// Declare the queue
ch.assertQueue(q, { durable: false })
// Send message to the queue
ch.sendToQueue(q, new Buffer('Hello World!'))
console.log(" {x} Sent 'Hello World'")
// Close the connection and exit
setTimeout(() => {
conn.close()
process.exit(0)
}, 500)
})
})
# Terminal
sudo chmod 755 send.js
./send.js
This will receive messages from a queue.
#!/usr/bin/env node
const amqp = require('amqplib/callback_api')
// Create connection
amqp.connect('amqp://localhost', (err, conn) => {
// Create channel
conn.createChannel((err, ch) => {
// Name of the queue
const q = 'hello'
// Declare the queue
ch.assertQueue(q, { durable: false })
// Wait for Queue Messages
console.log(` [*] Waiting for messages in ${q}. To exit press CTRL+C`)
ch.consume( q, msg => {
console.log(` [x] Received ${msg.content.toString()}`)
}, { noAck: true }
)
})
})
# Terminal
sudo chmod 755 receive.js
./receive.js
This will list all queues present in the RabbitMQ service
/usr/local/sbin/rabbitmqctl list_queues
This will send a message to the next available queue.
#!/usr/bin/env node
const amqp = require('amqplib/callback_api')
// Create connection
amqp.connect('amqp://localhost', (err, conn) => {
// Create channel
conn.createChannel((err, ch) => {
// Name of the queue
const q = 'task_queue_durable'
// Write a message
const msg = process.argv.slice(2).join(' ') || "Hello World!"
// Declare the queue
ch.assertQueue(q, { durable: true }) // { durable: true } ensures that the message will still be redelivered even if RabbitMQ service is turned off/restarted
// Send message to the queue
ch.sendToQueue(q, new Buffer(msg), {persistent: true}) // {persistent: true} saves the message to disk/cache
console.log(` {x} Sent '${msg}'`)
// Close the connection and exit
setTimeout(() => {
conn.close()
process.exit(0)
}, 500)
})
})
This will consume messages when it's the next worker in the round-robin dispatch.
#!/usr/bin/env node
const amqp = require('amqplib/callback_api')
// Create connection
amqp.connect('amqp://localhost', (err, conn) => {
// Create channel
conn.createChannel((err, ch) => {
// Name of the queue
const q = 'task_queue_durable'
// Declare the queue
ch.assertQueue(q, { durable: true }) // { durable: true } ensures that the message will still be redelivered even if RabbitMQ service is turned off/restarted
// Tell RabbitMQ not to give more than 1 message per worker
ch.prefetch(1)
// Wait for Queue Messages
console.log(` [*] Waiting for messages in ${q}. To exit press CTRL+C`)
ch.consume( q, msg => {
// Just to simulate a fake task, length of dots in the message
// is the number of secs the task will run
const secs = msg.content.toString().split('.').length - 1
console.log(` [x] Received ${msg.content.toString()}`)
console.log(` [x] Task will run for ${secs} secs`)
// Fake task which simulates execution time
setTimeout(() => {
console.log(` [x] Done ${msg.content.toString()}`);
// Send acknowledgment
ch.ack(msg)
}, secs * 1000)
}, { noAck: false } // noAck: false means Message acknowledgments is turned on
// When message acknowledgements are turned on, even if a worker.js is killed (Ctrl+C)
// while processing a message, it will be redelivered
)
})
})
# terminal 1
sudo chmod 755 worker.js
./worker.js
# => [*] Waiting for messages. To exit press CTRL+C
# terminal 2
./worker.js
# => [*] Waiting for messages. To exit press CTRL+C
sudo chmod 755 new_task.js
./new_task.js First message.
./new_task.js Second message..
./new_task.js Third message...
./new_task.js Fourth message....
./new_task.js Fifth message.....
This step was already done so there is nothing else to change anything in worker.js.
{ noAck: false } // noAck: false means Message acknowledgments is turned on
// When message acknowledgements are turned on, even if a worker.js is killed (Ctrl+C)
// while processing a message, it will be redelivered
This step was already done so there is nothing else to change anything in worker.js.
ch.assertQueue(q, { durable: true }) // { durable: true } ensures that the message will still be redelivered even if RabbitMQ service is turned off/restarted
// Send acknowledgment
ch.ack(msg)
To test this, comment out ch.ack(msg)
. Messages will not be acknowledge with this commented.
/usr/local/sbin/rabbitmqctl list_queues name messages_ready messages_unacknowledged
As requirement to #7, persistence needs to be set to true.
ch.sendToQueue(q, new Buffer(msg), {persistent: true}) // {persistent: true} saves the message to disk/cache
// Tell RabbitMQ not to give more than 1 message per worker
ch.prefetch(1)
- The queue is now capable of round robin dispatch of messages.
- Message acknowledgement is turned on which means that if a worker dies, the message will be redelivered if not already acknowledged.
- Message persistence is turned on which means that if the RabbitMQ is killed or restarted, the message will still be written on disk or cache.
- Fair dispatch is enabled which means, the consumer will not process more than X messages per worker at the risk of filling up a queue.
This will publish messages to an exchange and deliver the messages to all queues bound to that exchange.
#!/usr/bin/env node
const amqp = require('amqplib/callback_api')
// Create connection
amqp.connect('amqp://localhost', (err, conn) => {
// Create channel
conn.createChannel((err, ch) => {
// Name of the exchange
const ex = 'logs'
// Write a message
const msg = process.argv.slice(2).join(' ') || "Hello World!"
// Declare the exchange
ch.assertExchange(ex, 'fanout', { durable: false }) // 'fanout' will broadcast all messages to all the queues it knows
// Send message to the exchange
ch.publish(ex, '', new Buffer(msg)) // '' empty string means that message will not be sent to a specific queue
console.log(` {x} Sent '${msg}'`)
// Close the connection and exit
setTimeout(() => {
conn.close()
process.exit(0)
}, 500)
})
})
- We are now declaring an exchange instead of a queue
// Name of the exchange
const ex = 'logs'
// Declare the exchange
ch.assertExchange(ex, 'fanout', { durable: false }) // 'fanout' will broadcast all messages to all the queues it knows
- We now publish messages instead of sending a message directly to a queue
// Send message to the exchange
ch.publish(ex, '', new Buffer(msg)) // '' empty string means that message will not be sent to a specific queue
Every instance of this will subscribe to an exchange and receive messages from the queue.
#!/usr/bin/env node
const amqp = require('amqplib/callback_api')
// Create connection
amqp.connect('amqp://localhost', (err, conn) => {
// Create channel
conn.createChannel((err, ch) => {
// Name of the exchange
const ex = 'logs'
// Declare the exchange
ch.assertExchange(ex, 'fanout', { durable: false }) // 'fanout' will broadcast all messages to all the queues it knows
// Declare the queues
ch.assertQueue('', {exclusive: true}, (err, q) => {
// Wait for Queue Messages
console.log(` [*] Waiting for messages in ${q}. To exit press CTRL+C`)
// Tell exchange to send messages to queue
ch.bindQueue(q.queue, ex, '')
// Consume queue messages
ch.consume(q.queue, msg => {
console.log(` [x] ${msg.content.toString()}`)
}, {noAck: true})
})
})
})
- We are now declaring an exchange instead of a queue
// Name of the exchange
const ex = 'logs'
// Declare the exchange
ch.assertExchange(ex, 'fanout', { durable: false }) // 'fanout' will broadcast all messages to all the queues it knows
- We create a non-durable queue with a generated queue name by specifying an empty string.
ch.assertQueue('', {exclusive: true}, (err, q) => {
- We bind the exchange to the queue
// Tell exchange to send messages to queue
ch.bindQueue(q.queue, ex, '')
- The consumer consumes messages to every queue in the exchange (i.e. Sent to all running queues)
// Consume queue messages
ch.consume(q.queue, msg => {
console.log(` [x] ${msg.content.toString()}`)
}, {noAck: true})
#Terminal 1
sudo chmod 755 receive_logs.js
./receive_logs.js
#Terminal 2
./receive_logs.js
#Terminal 3
sudo chmod 755 emit_log.js
./emit_log.js
/usr/local/sbin/rabbitmqctl list_bindings
Check them by typing the following in your terminal:
/usr/local/sbin/rabbitmqctl list_exchanges
- We created subscribers that creates queues with randomly generated names and bind those to an exchange.
- We created a publisher that sends messages to an exchange and consumes the message to all queues bound to that exchange.
#!/usr/bin/env node
const amqp = require('amqplib/callback_api')
// Create connection
amqp.connect('amqp://localhost', (err, conn) => {
// Create channel
conn.createChannel((err, ch) => {
// Name of the exchange
const ex = 'direct_logs'
// Store message as args
const args = process.argv.slice(2)
// Write a message
const msg = args.slice(1).join(' ') || "Hello World!"
// Use severity as the binding key
const severity = (args.length > 0) ? args[0] : 'info'
// Declare the exchange
ch.assertExchange(ex, 'direct', { durable: false }) // 'direct' will broadcast messages to its corresponding binding key (i.e. severity)
// Send message to the exchange
ch.publish(ex, severity, new Buffer(msg)) // '' empty string means that message will not be sent to a specific queue
console.log(` {x} Sent ${severity}: '${msg}'`)
// Close the connection and exit
setTimeout(() => {
conn.close()
process.exit(0)
}, 500)
})
})
- We identify a binding key as part of the run arguments. For this example, we are using
severity
which can either have a value ofinfo
,warning
, orerror
.
// Store message as args
const args = process.argv.slice(2)
// Write a message
const msg = args.slice(1).join(' ') || "Hello World!"
// Use severity as the binding key
const severity = (args.length > 0) ? args[0] : 'info'
- The exchange is now declared as
direct
.
// Declare the exchange
ch.assertExchange(ex, 'direct', { durable: false }) // 'direct' will broadcast messages to its corresponding binding key (i.e. severity)
- Publishing the message now uses a binding key
severity
.
// Send message to the exchange
ch.publish(ex, severity, new Buffer(msg)) // '' empty string means that message will not be sent to a specific queue
console.log(` {x} Sent ${severity}: '${msg}'`)
#!/usr/bin/env node
const amqp = require('amqplib/callback_api')
// Store message as args
const args = process.argv.slice(2);
// Choose what type of binding key `receive_logs_direct.js` is going to use
// If no arguments were provided, output instructions
if (args.length == 0) {
console.log("Usage: receive_logs_direct.js [info] [warning] [error]");
process.exit(1);
}
// Create connection
amqp.connect('amqp://localhost', (err, conn) => {
// Create channel
conn.createChannel((err, ch) => {
// Name of the exchange
const ex = 'direct_logs'
// Declare the exchange
ch.assertExchange(ex, 'direct', { durable: false }) // 'direct' will broadcast messages to its corresponding binding key (i.e. severity)
// Declare the queues
ch.assertQueue('', {exclusive: true}, (err, q) => {
// Wait for Queue Messages
console.log(` [*] Waiting for messages in ${q}. To exit press CTRL+C`)
// For each binding key, tell exchange to send messages to queue
args.forEach( severity => {
ch.bindQueue(q.queue, ex, severity)
})
// Consume queue messages
ch.consume(q.queue, msg => {
console.log(` [x] ${msg.fields.routingKey}: ${msg.content.toString()}`)
}, {noAck: true})
})
})
})
- We now require the user to specify a binding key as part of the run arguments. If no binding key is provided, an instruction will be outputted to the user.
// If no arguments were provided, output instructions
if (args.length == 0) {
console.log("Usage: receive_logs_direct.js [info] [warning] [error]");
process.exit(1);
}
- The exchange is now declared as
direct
.
// Declare the exchange
ch.assertExchange(ex, 'direct', { durable: false }) // 'direct' will broadcast messages to its corresponding binding key (i.e. severity)
- The exchange is bound to queues by binding key
// For each binding key, tell exchange to send messages to queue
args.forEach( severity => {
ch.bindQueue(q.queue, ex, severity)
})
- The output message now outputs the routing key (equivalent value to binding key)
// Consume queue messages
ch.consume(q.queue, msg => {
console.log(` [x] ${msg.fields.routingKey}: ${msg.content.toString()}`)
}, {noAck: true})
# Terminal 1 - Will display info messages only
chmod 755 receive_logs_direct.js
./receive_logs_direct.js info
# Terminal 2 - Will display warning and error messages
./receive_logs_direct.js warning error
# Terminal 3
chmod 755 emit_log_direct.js
./emit_log_direct.js info This is an info message
./emit_log_direct.js warning This is a warning message
./emit_log_direct.js error This is an error message
- We are now publishing and subscribing to messages using a
direct
exchange. - A
direct
exchange uses abinding key
(which has an equivalent value to arouting key
). This key will tell the subscribers to listen only to a specific queue (similar to a channel). - Receiving logs require that a binding key is specified before runtime. The queue will only receive messages from publishers with a similar binding keys which is also specified before runtime.