diff --git a/lib/index.js b/lib/index.js index bc155a2..bf164f8 100644 --- a/lib/index.js +++ b/lib/index.js @@ -1,10 +1,5 @@ 'use strict'; -Object.defineProperty(exports, "__esModule", { - value: true -}); -exports.connect = connect; - var _AmqpConnectionManager = require('./AmqpConnectionManager'); var _AmqpConnectionManager2 = _interopRequireDefault(_AmqpConnectionManager); @@ -15,22 +10,31 @@ var _path2 = _interopRequireDefault(_path); function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } +/** check for logger in the global scope. */ +const logger = global.logger ? global.logger : { log: console.log, error: console.error }; + function connect(urls, options) { return new _AmqpConnectionManager2.default(urls, options); } -const amqp = { - connect -}; - -// export default amqp; const env = process.env.NODE_ENV || 'development'; const currentPath = process.cwd(); + const config = require(_path2.default.join(currentPath, 'config', 'env', `${env}`)); -console.log(config); -// const QUEUE_NAME = 'amqp-connection-manager-sample' -const { host, port, username, password, vhost, protocol = 'amqp' } = config.rabbitMQ; +const { + host, + port, + username, + password, + vhost = '/', + protocol = 'amqp', + prefetch = 2, + heartbeatInterval = 5, + reconnectTime = 10, + options = {}, + defaultQueueFeatures = { durable: true } +} = config.rabbitMQ; // Handle an incomming message. // const onMessage = function(channelWrapper, data) { @@ -49,44 +53,55 @@ const connectionUrl = `${protocol}://${username}:${password}@${host}:${port}/${v */ // Create a connetion manager -const connection = connect([connectionUrl], { json: true }); - -console.log(connection); +const connection = connect([connectionUrl], { + json: true, + heartbeatIntervalInSeconds: heartbeatInterval, + reconnectTimeInSeconds: reconnectTime, + connectionOptions: options +}); connection.on('connect', () => { - console.log('Connected!'); + logger.log('data', { note: 'Connected to RabbitMQ server' }); }); + connection.on('disconnect', params => { - console.log('Disconnected.', params.err.stack); + logger.log('error', { error: params.err, note: 'RabbitMQ server is disconnected' }); }); /** * Consumer. * - * @param {string} queueName - name of queue. + * @param {object} params - object with queue name and queue options. * @param {function} [handler] - callback. * @returns {void | Promise} - Resolves when complete. */ -const consume = (queueName, handler) => { +const consume = (params = {}, handler) => { + const queueName = params.queue && params.queue.name; + const queueOptions = params.queue.options || defaultQueueFeatures; + + if (!queueName) { + return Promise.reject(new Error('Queue name is missing')); + } + // Set up a channel listening for messages in the queue. const channelWrapper = connection.createChannel({ setup(channel) { // `channel` here is a regular amqplib `ConfirmChannel`. - return Promise.all([channel.assertQueue(queueName, { durable: true }), channel.prefetch(1), + return Promise.all([channel.assertQueue(queueName, queueOptions), channel.prefetch(prefetch), // channel.consume(queueName, handler.bind(null, channelWrapper)) channel.consume(queueName, data => { const message = JSON.parse(data.content.toString()); handler(message).then(() => channelWrapper.ack(data)).catch(() => {}); }, { noAck: false })]).catch(e => { - console.error(e); + logger.log('error', { error: e, note: 'error from consume' }); }); } }); - // return channelWrapper; + /** start the consumer */ return channelWrapper.waitForConnect().then(() => { - console.log(`Listening for messages on ${queueName}`); + logger.log('data', { note: `Consumption from ${queueName} started!` }); }); }; @@ -97,7 +112,6 @@ const consume = (queueName, handler) => { * @param {object} [data] - data to be published. */ const publish = (queueName, data) => { - console.log(queueName, data); const channelWrapper = connection.createChannel({ json: true, setup(channel) { @@ -109,13 +123,10 @@ const publish = (queueName, data) => { // Send messages until someone hits CTRL-C or something goes wrong... const startPublishing = () => { channelWrapper.sendToQueue(queueName, data, { persistent: true }).then(() => { - console.log('Message sent'); - // return wait(1000); + logger.log('data', { note: `Message sent to queue ${queueName}` }); return null; - }).then(() => - // return sendMessage(); - Promise.resolve()).catch(err => { - console.log('Message was rejected:', err.stack); + }).catch(err => { + logger.log('error', { note: 'Message was rejected', error: err, custom: { data } }); channelWrapper.close(); connection.close(); }); @@ -130,7 +141,7 @@ const purgeQueue = queueName => { const channelWrapper = connection.createChannel({ setup(channel) { // `channel` here is a regular amqplib `ConfirmChannel`. - return Promise.all([channel.assertQueue(queueName, { durable: true }), channel.purgeQueue(queueName)]); + return Promise.all([channel.assertQueue(queueName, defaultQueueFeatures), channel.purgeQueue(queueName)]); } }); @@ -154,6 +165,4 @@ module.exports = { purgeQueue, ackAll }; - -exports.default = amqp; //# sourceMappingURL=index.js.map \ No newline at end of file diff --git a/lib/index.js.map b/lib/index.js.map index e4ffe20..8c3276b 100644 --- a/lib/index.js.map +++ b/lib/index.js.map @@ -1 +1 @@ -{"version":3,"sources":["../src/index.js"],"names":["connect","urls","options","AmqpConnectionManager","amqp","env","process","NODE_ENV","currentPath","cwd","config","require","path","join","console","log","host","port","username","password","vhost","protocol","rabbitMQ","connectionUrl","connection","json","on","params","err","stack","consume","queueName","handler","channelWrapper","createChannel","setup","channel","Promise","all","assertQueue","durable","prefetch","data","message","JSON","parse","content","toString","then","ack","catch","noAck","e","error","waitForConnect","publish","startPublishing","sendToQueue","persistent","resolve","close","purgeQueue","ackAll","module","exports"],"mappings":";;;;;QAGgBA,O,GAAAA,O;;AAHhB;;;;AACA;;;;;;AAEO,SAASA,OAAT,CAAiBC,IAAjB,EAAuBC,OAAvB,EAAgC;AACnC,SAAO,IAAIC,+BAAJ,CAA0BF,IAA1B,EAAgCC,OAAhC,CAAP;AACH;;AAED,MAAME,OAAO;AACTJ;AADS,CAAb;;AAIA;AACA,MAAMK,MAAMC,QAAQD,GAAR,CAAYE,QAAZ,IAAwB,aAApC;AACA,MAAMC,cAAcF,QAAQG,GAAR,EAApB;AACA,MAAMC,SAASC,QAAQC,eAAKC,IAAL,CAAUL,WAAV,EAAuB,QAAvB,EAAiC,KAAjC,EAAyC,GAAEH,GAAI,EAA/C,CAAR,CAAf;AACAS,QAAQC,GAAR,CAAYL,MAAZ;;AAEA;AACA,MAAM,EAAEM,IAAF,EAAQC,IAAR,EAAcC,QAAd,EAAwBC,QAAxB,EAAkCC,KAAlC,EAAyCC,WAAW,MAApD,KAA+DX,OAAOY,QAA5E;;AAEA;AACA;AACA;AACA;AACA;AACA;;AAEA,MAAMC,gBAAiB,GAAEF,QAAS,MAAKH,QAAS,IAAGC,QAAS,IAAGH,IAAK,IAAGC,IAAK,IAAGG,KAAM,EAArF;;AAEA;;;;;;;AAOA;AACA,MAAMI,aAAaxB,QACjB,CAACuB,aAAD,CADiB,EAEjB,EAAEE,MAAM,IAAR,EAFiB,CAAnB;;AAKAX,QAAQC,GAAR,CAAYS,UAAZ;;AAEAA,WAAWE,EAAX,CAAc,SAAd,EAAyB,MAAM;AAC7BZ,UAAQC,GAAR,CAAY,YAAZ;AACD,CAFD;AAGAS,WAAWE,EAAX,CAAc,YAAd,EAA4BC,UAAU;AACpCb,UAAQC,GAAR,CAAY,eAAZ,EAA6BY,OAAOC,GAAP,CAAWC,KAAxC;AACD,CAFD;;AAIA;;;;;;;AAOA,MAAMC,UAAU,CAACC,SAAD,EAAYC,OAAZ,KAAwB;AACtC;AACA,QAAMC,iBAAiBT,WAAWU,aAAX,CAAyB;AAC9CC,UAAMC,OAAN,EAAe;AACb;AACA,aAAOC,QAAQC,GAAR,CAAY,CACjBF,QAAQG,WAAR,CAAoBR,SAApB,EAA+B,EAAES,SAAS,IAAX,EAA/B,CADiB,EAEjBJ,QAAQK,QAAR,CAAiB,CAAjB,CAFiB;AAGjB;AACAL,cAAQN,OAAR,CACEC,SADF,EAEEW,QAAQ;AACN,cAAMC,UAAUC,KAAKC,KAAL,CAAWH,KAAKI,OAAL,CAAaC,QAAb,EAAX,CAAhB;;AAEAf,gBAAQW,OAAR,EAAiBK,IAAjB,CAAsB,MAAMf,eAAegB,GAAf,CAAmBP,IAAnB,CAA5B,EAAsDQ,KAAtD,CAA4D,MAAM,CAAE,CAApE;AACD,OANH,EAOE,EAAEC,OAAO,KAAT,EAPF,CAJiB,CAAZ,EAaJD,KAbI,CAaEE,KAAK;AACZtC,gBAAQuC,KAAR,CAAcD,CAAd;AACD,OAfM,CAAP;AAgBD;AAnB6C,GAAzB,CAAvB;;AAsBA;AACA,SAAOnB,eAAeqB,cAAf,GAAgCN,IAAhC,CAAqC,MAAM;AAChDlC,YAAQC,GAAR,CAAa,6BAA4BgB,SAAU,EAAnD;AACD,GAFM,CAAP;AAGD,CA5BD;;AA8BA;;;;;;AAMA,MAAMwB,UAAU,CAACxB,SAAD,EAAYW,IAAZ,KAAqB;AACnC5B,UAAQC,GAAR,CAAYgB,SAAZ,EAAuBW,IAAvB;AACA,QAAMT,iBAAiBT,WAAWU,aAAX,CAAyB;AAC9CT,UAAM,IADwC;AAE9CU,UAAMC,OAAN,EAAe;AACb;AACA,aAAOA,QAAQG,WAAR,CAAoBR,SAApB,EAA+B,EAAES,SAAS,IAAX,EAA/B,CAAP;AACD;AAL6C,GAAzB,CAAvB;;AAQA;AACA,QAAMgB,kBAAkB,MAAM;AAC5BvB,mBACGwB,WADH,CACe1B,SADf,EAC0BW,IAD1B,EACgC,EAAEgB,YAAY,IAAd,EADhC,EAEGV,IAFH,CAEQ,MAAM;AACVlC,cAAQC,GAAR,CAAY,cAAZ;AACA;AACA,aAAO,IAAP;AACD,KANH,EAOGiC,IAPH,CAOQ;AACJ;AACAX,YAAQsB,OAAR,EATJ,EAWGT,KAXH,CAWStB,OAAO;AACZd,cAAQC,GAAR,CAAY,uBAAZ,EAAqCa,IAAIC,KAAzC;AACAI,qBAAe2B,KAAf;AACApC,iBAAWoC,KAAX;AACD,KAfH;AAgBD,GAjBD;;AAmBAJ;;AAEA;AACD,CAjCD;;AAmCA,MAAMK,aAAa9B,aAAa;AAC9B,QAAME,iBAAiBT,WAAWU,aAAX,CAAyB;AAC9CC,UAAMC,OAAN,EAAe;AACb;AACA,aAAOC,QAAQC,GAAR,CAAY,CACjBF,QAAQG,WAAR,CAAoBR,SAApB,EAA+B,EAAES,SAAS,IAAX,EAA/B,CADiB,EAEjBJ,QAAQyB,UAAR,CAAmB9B,SAAnB,CAFiB,CAAZ,CAAP;AAID;AAP6C,GAAzB,CAAvB;;AAUA,SAAOE,cAAP;AACD,CAZD;;AAcA,MAAM6B,SAAS,MAAM;AACnB,QAAM7B,iBAAiBT,WAAWU,aAAX,CAAyB;AAC9CC,UAAMC,OAAN,EAAe;AACb;AACA,aAAOA,QAAQ0B,MAAR,EAAP;AACD;AAJ6C,GAAzB,CAAvB;;AAOA,SAAO7B,cAAP;AACD,CATD;;AAWA8B,OAAOC,OAAP,GAAiB;AACfT,SADe;AAEfzB,SAFe;AAGf+B,YAHe;AAIfC;AAJe,CAAjB;;kBAOe1D,I","file":"index.js","sourcesContent":["import AmqpConnectionManager from './AmqpConnectionManager';\nimport path from 'path';\n\nexport function connect(urls, options) {\n return new AmqpConnectionManager(urls, options);\n}\n\nconst amqp = {\n connect\n};\n\n// export default amqp;\nconst env = process.env.NODE_ENV || 'development';\nconst currentPath = process.cwd();\nconst config = require(path.join(currentPath, 'config', 'env', `${env}`));\nconsole.log(config);\n\n// const QUEUE_NAME = 'amqp-connection-manager-sample'\nconst { host, port, username, password, vhost, protocol = 'amqp' } = config.rabbitMQ;\n\n// Handle an incomming message.\n// const onMessage = function(channelWrapper, data) {\n// const message = JSON.parse(data.content.toString());\n// console.log(\"receiver: got message\", message);\n// channelWrapper.ack(data);\n// }\n\nconst connectionUrl = `${protocol}://${username}:${password}@${host}:${port}/${vhost}`;\n\n/**\n * options.heartbeatIntervalInSeconds - Interval to send heartbeats to broker. Defaults to 5 seconds.\n * options.reconnectTimeInSeconds - The time to wait before trying to reconnect. If not specified, defaults to heartbeatIntervalInSeconds.\n * options.findServers(callback) is a function which returns one or more servers to connect to. This should return either a single URL or an array of URLs. This is handy when you're using a service discovery mechanism such as Consul or etcd. Instead of taking a callback, this can also return a Promise. Note that if this is supplied, then urls is ignored.\n * options.connectionOptions is passed as options to the amqplib connect method.\n */\n\n// Create a connetion manager\nconst connection = connect(\n [connectionUrl],\n { json: true }\n);\n\nconsole.log(connection);\n\nconnection.on('connect', () => {\n console.log('Connected!');\n});\nconnection.on('disconnect', params => {\n console.log('Disconnected.', params.err.stack);\n});\n\n/**\n * Consumer.\n *\n * @param {string} queueName - name of queue.\n * @param {function} [handler] - callback.\n * @returns {void | Promise} - Resolves when complete.\n */\nconst consume = (queueName, handler) => {\n // Set up a channel listening for messages in the queue.\n const channelWrapper = connection.createChannel({\n setup(channel) {\n // `channel` here is a regular amqplib `ConfirmChannel`.\n return Promise.all([\n channel.assertQueue(queueName, { durable: true }),\n channel.prefetch(1),\n // channel.consume(queueName, handler.bind(null, channelWrapper))\n channel.consume(\n queueName,\n data => {\n const message = JSON.parse(data.content.toString());\n\n handler(message).then(() => channelWrapper.ack(data)).catch(() => {});\n },\n { noAck: false }\n )\n ]).catch(e => {\n console.error(e);\n });\n }\n });\n\n // return channelWrapper;\n return channelWrapper.waitForConnect().then(() => {\n console.log(`Listening for messages on ${queueName}`);\n });\n};\n\n/**\n * Publisher.\n *\n * @param {string} queueName - name of queue.\n * @param {object} [data] - data to be published.\n */\nconst publish = (queueName, data) => {\n console.log(queueName, data);\n const channelWrapper = connection.createChannel({\n json: true,\n setup(channel) {\n // `channel` here is a regular amqplib `ConfirmChannel`.\n return channel.assertQueue(queueName, { durable: true });\n }\n });\n\n // Send messages until someone hits CTRL-C or something goes wrong...\n const startPublishing = () => {\n channelWrapper\n .sendToQueue(queueName, data, { persistent: true })\n .then(() => {\n console.log('Message sent');\n // return wait(1000);\n return null;\n })\n .then(() =>\n // return sendMessage();\n Promise.resolve()\n )\n .catch(err => {\n console.log('Message was rejected:', err.stack);\n channelWrapper.close();\n connection.close();\n });\n };\n\n startPublishing();\n\n // return sendMessage;\n};\n\nconst purgeQueue = queueName => {\n const channelWrapper = connection.createChannel({\n setup(channel) {\n // `channel` here is a regular amqplib `ConfirmChannel`.\n return Promise.all([\n channel.assertQueue(queueName, { durable: true }),\n channel.purgeQueue(queueName)\n ]);\n }\n });\n\n return channelWrapper;\n};\n\nconst ackAll = () => {\n const channelWrapper = connection.createChannel({\n setup(channel) {\n // `channel` here is a regular amqplib `ConfirmChannel`.\n return channel.ackAll();\n }\n });\n\n return channelWrapper;\n};\n\nmodule.exports = {\n publish,\n consume,\n purgeQueue,\n ackAll\n};\n\nexport default amqp;\n"]} \ No newline at end of file +{"version":3,"sources":["../src/index.js"],"names":["logger","global","log","console","error","connect","urls","options","AmqpConnectionManager","env","process","NODE_ENV","currentPath","cwd","config","require","path","join","host","port","username","password","vhost","protocol","prefetch","heartbeatInterval","reconnectTime","defaultQueueFeatures","durable","rabbitMQ","connectionUrl","connection","json","heartbeatIntervalInSeconds","reconnectTimeInSeconds","connectionOptions","on","note","params","err","consume","handler","queueName","queue","name","queueOptions","Promise","reject","Error","channelWrapper","createChannel","setup","channel","all","assertQueue","data","message","JSON","parse","content","toString","then","ack","catch","noAck","e","waitForConnect","publish","startPublishing","sendToQueue","persistent","custom","close","purgeQueue","ackAll","module","exports"],"mappings":";;AAAA;;;;AACA;;;;;;AAEA;AACA,MAAMA,SAASC,OAAOD,MAAP,GAAgBC,OAAOD,MAAvB,GAAgC,EAAEE,KAAKC,QAAQD,GAAf,EAAoBE,OAAOD,QAAQC,KAAnC,EAA/C;;AAEA,SAASC,OAAT,CAAiBC,IAAjB,EAAuBC,OAAvB,EAAgC;AAC5B,SAAO,IAAIC,+BAAJ,CAA0BF,IAA1B,EAAgCC,OAAhC,CAAP;AACH;;AAED,MAAME,MAAMC,QAAQD,GAAR,CAAYE,QAAZ,IAAwB,aAApC;AACA,MAAMC,cAAcF,QAAQG,GAAR,EAApB;;AAEA,MAAMC,SAASC,QAAQC,eAAKC,IAAL,CAAUL,WAAV,EAAuB,QAAvB,EAAiC,KAAjC,EAAyC,GAAEH,GAAI,EAA/C,CAAR,CAAf;;AAEA,MAAM;AACFS,MADE;AAEFC,MAFE;AAGFC,UAHE;AAIFC,UAJE;AAKFC,UAAQ,GALN;AAMFC,aAAW,MANT;AAOFC,aAAW,CAPT;AAQFC,sBAAoB,CARlB;AASFC,kBAAgB,EATd;AAUFnB,YAAU,EAVR;AAWFoB,yBAAuB,EAAEC,SAAS,IAAX;AAXrB,IAYFd,OAAOe,QAZX;;AAcA;AACA;AACA;AACA;AACA;AACA;;AAEA,MAAMC,gBAAiB,GAAEP,QAAS,MAAKH,QAAS,IAAGC,QAAS,IAAGH,IAAK,IAAGC,IAAK,IAAGG,KAAM,EAArF;;AAEA;;;;;;;AAOA;AACA,MAAMS,aAAa1B,QACjB,CAACyB,aAAD,CADiB,EAEjB;AACEE,QAAM,IADR;AAEEC,8BAA4BR,iBAF9B;AAGES,0BAAwBR,aAH1B;AAIES,qBAAmB5B;AAJrB,CAFiB,CAAnB;;AAUAwB,WAAWK,EAAX,CAAc,SAAd,EAAyB,MAAM;AAC7BpC,SAAOE,GAAP,CAAW,MAAX,EAAmB,EAAEmC,MAAM,8BAAR,EAAnB;AACD,CAFD;;AAIAN,WAAWK,EAAX,CAAc,YAAd,EAA4BE,UAAU;AACpCtC,SAAOE,GAAP,CAAW,OAAX,EAAoB,EAAEE,OAAOkC,OAAOC,GAAhB,EAAqBF,MAAM,iCAA3B,EAApB;AACD,CAFD;;AAIA;;;;;;;AAOA,MAAMG,UAAU,CAACF,SAAS,EAAV,EAAcG,OAAd,KAA0B;AACxC,QAAMC,YAAYJ,OAAOK,KAAP,IAAgBL,OAAOK,KAAP,CAAaC,IAA/C;AACA,QAAMC,eAAeP,OAAOK,KAAP,CAAapC,OAAb,IAAwBoB,oBAA7C;;AAEA,MAAI,CAACe,SAAL,EAAgB;AACd,WAAOI,QAAQC,MAAR,CAAe,IAAIC,KAAJ,CAAU,uBAAV,CAAf,CAAP;AACD;;AAED;AACA,QAAMC,iBAAiBlB,WAAWmB,aAAX,CAAyB;AAC9CC,UAAMC,OAAN,EAAe;AACb;AACA,aAAON,QAAQO,GAAR,CAAY,CACjBD,QAAQE,WAAR,CAAoBZ,SAApB,EAA+BG,YAA/B,CADiB,EAEjBO,QAAQ5B,QAAR,CAAiBA,QAAjB,CAFiB;AAGjB;AACA4B,cAAQZ,OAAR,CACEE,SADF,EAEEa,QAAQ;AACN,cAAMC,UAAUC,KAAKC,KAAL,CAAWH,KAAKI,OAAL,CAAaC,QAAb,EAAX,CAAhB;;AAEAnB,gBAAQe,OAAR,EAAiBK,IAAjB,CAAsB,MAAMZ,eAAea,GAAf,CAAmBP,IAAnB,CAA5B,EAAsDQ,KAAtD,CAA4D,MAAM,CAAE,CAApE;AACD,OANH,EAOE,EAAEC,OAAO,KAAT,EAPF,CAJiB,CAAZ,EAaJD,KAbI,CAaEE,KAAK;AACZjE,eAAOE,GAAP,CAAW,OAAX,EAAoB,EAAEE,OAAO6D,CAAT,EAAY5B,MAAM,oBAAlB,EAApB;AACD,OAfM,CAAP;AAgBD;AAnB6C,GAAzB,CAAvB;;AAsBA;AACA,SAAOY,eAAeiB,cAAf,GAAgCL,IAAhC,CAAqC,MAAM;AAChD7D,WAAOE,GAAP,CAAW,MAAX,EAAmB,EAAEmC,MAAO,oBAAmBK,SAAU,WAAtC,EAAnB;AACD,GAFM,CAAP;AAGD,CAnCD;;AAqCA;;;;;;AAMA,MAAMyB,UAAU,CAACzB,SAAD,EAAYa,IAAZ,KAAqB;AACnC,QAAMN,iBAAiBlB,WAAWmB,aAAX,CAAyB;AAC9ClB,UAAM,IADwC;AAE9CmB,UAAMC,OAAN,EAAe;AACb;AACA,aAAOA,QAAQE,WAAR,CAAoBZ,SAApB,EAA+B,EAAEd,SAAS,IAAX,EAA/B,CAAP;AACD;AAL6C,GAAzB,CAAvB;;AAQA;AACA,QAAMwC,kBAAkB,MAAM;AAC5BnB,mBACGoB,WADH,CACe3B,SADf,EAC0Ba,IAD1B,EACgC,EAAEe,YAAY,IAAd,EADhC,EAEGT,IAFH,CAEQ,MAAM;AACV7D,aAAOE,GAAP,CAAW,MAAX,EAAmB,EAAEmC,MAAO,yBAAwBK,SAAU,EAA3C,EAAnB;AACA,aAAO,IAAP;AACD,KALH,EAMGqB,KANH,CAMSxB,OAAO;AACZvC,aAAOE,GAAP,CAAW,OAAX,EAAoB,EAAEmC,MAAM,sBAAR,EAAgCjC,OAAOmC,GAAvC,EAA4CgC,QAAQ,EAAEhB,IAAF,EAApD,EAApB;AACAN,qBAAeuB,KAAf;AACAzC,iBAAWyC,KAAX;AACD,KAVH;AAWD,GAZD;;AAcAJ;;AAEA;AACD,CA3BD;;AA6BA,MAAMK,aAAa/B,aAAa;AAC9B,QAAMO,iBAAiBlB,WAAWmB,aAAX,CAAyB;AAC9CC,UAAMC,OAAN,EAAe;AACb;AACA,aAAON,QAAQO,GAAR,CAAY,CACjBD,QAAQE,WAAR,CAAoBZ,SAApB,EAA+Bf,oBAA/B,CADiB,EAEjByB,QAAQqB,UAAR,CAAmB/B,SAAnB,CAFiB,CAAZ,CAAP;AAID;AAP6C,GAAzB,CAAvB;;AAUA,SAAOO,cAAP;AACD,CAZD;;AAcA,MAAMyB,SAAS,MAAM;AACnB,QAAMzB,iBAAiBlB,WAAWmB,aAAX,CAAyB;AAC9CC,UAAMC,OAAN,EAAe;AACb;AACA,aAAOA,QAAQsB,MAAR,EAAP;AACD;AAJ6C,GAAzB,CAAvB;;AAOA,SAAOzB,cAAP;AACD,CATD;;AAWA0B,OAAOC,OAAP,GAAiB;AACfT,SADe;AAEf3B,SAFe;AAGfiC,YAHe;AAIfC;AAJe,CAAjB","file":"index.js","sourcesContent":["import AmqpConnectionManager from './AmqpConnectionManager';\nimport path from 'path';\n\n/** check for logger in the global scope. */\nconst logger = global.logger ? global.logger : { log: console.log, error: console.error };\n\nfunction connect(urls, options) {\n return new AmqpConnectionManager(urls, options);\n}\n\nconst env = process.env.NODE_ENV || 'development';\nconst currentPath = process.cwd();\n\nconst config = require(path.join(currentPath, 'config', 'env', `${env}`));\n\nconst {\n host,\n port,\n username,\n password,\n vhost = '/',\n protocol = 'amqp',\n prefetch = 2,\n heartbeatInterval = 5,\n reconnectTime = 10,\n options = {},\n defaultQueueFeatures = { durable: true }\n} = config.rabbitMQ;\n\n// Handle an incomming message.\n// const onMessage = function(channelWrapper, data) {\n// const message = JSON.parse(data.content.toString());\n// console.log(\"receiver: got message\", message);\n// channelWrapper.ack(data);\n// }\n\nconst connectionUrl = `${protocol}://${username}:${password}@${host}:${port}/${vhost}`;\n\n/**\n * options.heartbeatIntervalInSeconds - Interval to send heartbeats to broker. Defaults to 5 seconds.\n * options.reconnectTimeInSeconds - The time to wait before trying to reconnect. If not specified, defaults to heartbeatIntervalInSeconds.\n * options.findServers(callback) is a function which returns one or more servers to connect to. This should return either a single URL or an array of URLs. This is handy when you're using a service discovery mechanism such as Consul or etcd. Instead of taking a callback, this can also return a Promise. Note that if this is supplied, then urls is ignored.\n * options.connectionOptions is passed as options to the amqplib connect method.\n */\n\n// Create a connetion manager\nconst connection = connect(\n [connectionUrl],\n {\n json: true,\n heartbeatIntervalInSeconds: heartbeatInterval,\n reconnectTimeInSeconds: reconnectTime,\n connectionOptions: options\n }\n);\n\nconnection.on('connect', () => {\n logger.log('data', { note: 'Connected to RabbitMQ server' });\n});\n\nconnection.on('disconnect', params => {\n logger.log('error', { error: params.err, note: 'RabbitMQ server is disconnected' });\n});\n\n/**\n * Consumer.\n *\n * @param {object} params - object with queue name and queue options.\n * @param {function} [handler] - callback.\n * @returns {void | Promise} - Resolves when complete.\n */\nconst consume = (params = {}, handler) => {\n const queueName = params.queue && params.queue.name;\n const queueOptions = params.queue.options || defaultQueueFeatures;\n\n if (!queueName) {\n return Promise.reject(new Error('Queue name is missing'));\n }\n\n // Set up a channel listening for messages in the queue.\n const channelWrapper = connection.createChannel({\n setup(channel) {\n // `channel` here is a regular amqplib `ConfirmChannel`.\n return Promise.all([\n channel.assertQueue(queueName, queueOptions),\n channel.prefetch(prefetch),\n // channel.consume(queueName, handler.bind(null, channelWrapper))\n channel.consume(\n queueName,\n data => {\n const message = JSON.parse(data.content.toString());\n\n handler(message).then(() => channelWrapper.ack(data)).catch(() => {});\n },\n { noAck: false }\n )\n ]).catch(e => {\n logger.log('error', { error: e, note: 'error from consume' });\n });\n }\n });\n\n /** start the consumer */\n return channelWrapper.waitForConnect().then(() => {\n logger.log('data', { note: `Consumption from ${queueName} started!` });\n });\n};\n\n/**\n * Publisher.\n *\n * @param {string} queueName - name of queue.\n * @param {object} [data] - data to be published.\n */\nconst publish = (queueName, data) => {\n const channelWrapper = connection.createChannel({\n json: true,\n setup(channel) {\n // `channel` here is a regular amqplib `ConfirmChannel`.\n return channel.assertQueue(queueName, { durable: true });\n }\n });\n\n // Send messages until someone hits CTRL-C or something goes wrong...\n const startPublishing = () => {\n channelWrapper\n .sendToQueue(queueName, data, { persistent: true })\n .then(() => {\n logger.log('data', { note: `Message sent to queue ${queueName}` });\n return null;\n })\n .catch(err => {\n logger.log('error', { note: 'Message was rejected', error: err, custom: { data }});\n channelWrapper.close();\n connection.close();\n });\n };\n\n startPublishing();\n\n // return sendMessage;\n};\n\nconst purgeQueue = queueName => {\n const channelWrapper = connection.createChannel({\n setup(channel) {\n // `channel` here is a regular amqplib `ConfirmChannel`.\n return Promise.all([\n channel.assertQueue(queueName, defaultQueueFeatures),\n channel.purgeQueue(queueName)\n ]);\n }\n });\n\n return channelWrapper;\n};\n\nconst ackAll = () => {\n const channelWrapper = connection.createChannel({\n setup(channel) {\n // `channel` here is a regular amqplib `ConfirmChannel`.\n return channel.ackAll();\n }\n });\n\n return channelWrapper;\n};\n\nmodule.exports = {\n publish,\n consume,\n purgeQueue,\n ackAll\n};\n"]} \ No newline at end of file