From fa328ce4af0182213a78e23fbf9072dcef6eb1a6 Mon Sep 17 00:00:00 2001 From: emadum Date: Sun, 4 Feb 2018 16:01:21 -0500 Subject: [PATCH] first pass implementation for issue #169 --- index.js | 51 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 36 insertions(+), 15 deletions(-) diff --git a/index.js b/index.js index c3434ca..604158c 100755 --- a/index.js +++ b/index.js @@ -33,6 +33,7 @@ const ONE_WEEK_IN_SECONDS = 604800; const PRESENCE_UPDATE_INTERVAL = ONE_SECOND; const HEALTH_UPDATE_INTERVAL = 5000; const KEY_EXPIRATION_TTL = parseInt(PRESENCE_UPDATE_INTERVAL / ONE_SECOND) * 3; +const MESSAGE_TTL = 30; const KEYS_PER_SCAN = '100'; const UMF_INVALID_MESSAGE = 'UMF message requires "to", "from" and "body" fields'; @@ -524,23 +525,11 @@ class Hydra extends EventEmitter { // Setup service message courier channels this.mcMessageChannelClient = this.testMode ? testRedis.createClient() : this.redisdb.duplicate(); this.mcMessageChannelClient.subscribe(`${mcMessageKey}:${serviceName}`); - this.mcMessageChannelClient.on('message', (channel, message) => { - let msg = Utils.safeJSONParse(message); - if (msg) { - let umfMsg = UMFMessage.createMessage(msg); - this.emit('message', umfMsg.toShort()); - } - }); + this.mcMessageChannelClient.on('message', (channel, messageId) => this._onMessage(channel, messageId)); this.mcDirectMessageChannelClient = this.testMode ? testRedis.createClient() : this.redisdb.duplicate(); this.mcDirectMessageChannelClient.subscribe(`${mcMessageKey}:${serviceName}:${this.instanceID}`); - this.mcDirectMessageChannelClient.on('message', (channel, message) => { - let msg = Utils.safeJSONParse(message); - if (msg) { - let umfMsg = UMFMessage.createMessage(msg); - this.emit('message', umfMsg.toShort()); - } - }); + this.mcDirectMessageChannelClient.on('message', (channel, messageId) => this._onMessage(channel, messageId)); // Schedule periodic updates this.presenceTimerInteval = setInterval(this._updatePresence, PRESENCE_UPDATE_INTERVAL); @@ -559,6 +548,27 @@ class Hydra extends EventEmitter { }); } + /** + * @name _onMessage + * @summary Handle pub/sub messages + * @private + * @param {string} channel - channel + * @param {string} messageId - UMF mid + */ + _onMessage(channel, messageId) { + this.redisdb.get(`${redisPreKey}:message:${messageId}`, (err, _result) => { + if (err) { + this._logMessage('error', err.message); + return; + } + let msg = Utils.safeJSONParse(_result); + if (msg) { + let umfMsg = UMFMessage.createMessage(msg); + this.emit('message', umfMsg.toShort()); + } + }); + } + /** * @name _registerRoutes * @summary Register routes @@ -1357,7 +1367,18 @@ class Hydra extends EventEmitter { if (messageChannel) { let msg = UMFMessage.createMessage(message); let strMessage = Utils.safeJSONStringify(msg.toShort()); - messageChannel.publish(channel, strMessage); + const messageId = message.mid; + this.redisdb.setex( + `${redisPreKey}:message:${messageId}`, + MESSAGE_TTL, + strMessage, + (err, result) => { + if (err) { + throw err; + } + messageChannel.publish(channel, messageId); + } + ); } }