Skip to content

Commit

Permalink
first pass implementation for issue #169
Browse files Browse the repository at this point in the history
  • Loading branch information
emadum committed Feb 4, 2018
1 parent 6bc808a commit fa328ce
Showing 1 changed file with 36 additions and 15 deletions.
51 changes: 36 additions & 15 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const ONE_WEEK_IN_SECONDS = 604800;
const PRESENCE_UPDATE_INTERVAL = ONE_SECOND;
const HEALTH_UPDATE_INTERVAL = 5000;
const KEY_EXPIRATION_TTL = parseInt(PRESENCE_UPDATE_INTERVAL / ONE_SECOND) * 3;
const MESSAGE_TTL = 30;
const KEYS_PER_SCAN = '100';
const UMF_INVALID_MESSAGE = 'UMF message requires "to", "from" and "body" fields';

Expand Down Expand Up @@ -524,23 +525,11 @@ class Hydra extends EventEmitter {
// Setup service message courier channels
this.mcMessageChannelClient = this.testMode ? testRedis.createClient() : this.redisdb.duplicate();
this.mcMessageChannelClient.subscribe(`${mcMessageKey}:${serviceName}`);
this.mcMessageChannelClient.on('message', (channel, message) => {
let msg = Utils.safeJSONParse(message);
if (msg) {
let umfMsg = UMFMessage.createMessage(msg);
this.emit('message', umfMsg.toShort());
}
});
this.mcMessageChannelClient.on('message', (channel, messageId) => this._onMessage(channel, messageId));

this.mcDirectMessageChannelClient = this.testMode ? testRedis.createClient() : this.redisdb.duplicate();
this.mcDirectMessageChannelClient.subscribe(`${mcMessageKey}:${serviceName}:${this.instanceID}`);
this.mcDirectMessageChannelClient.on('message', (channel, message) => {
let msg = Utils.safeJSONParse(message);
if (msg) {
let umfMsg = UMFMessage.createMessage(msg);
this.emit('message', umfMsg.toShort());
}
});
this.mcDirectMessageChannelClient.on('message', (channel, messageId) => this._onMessage(channel, messageId));

// Schedule periodic updates
this.presenceTimerInteval = setInterval(this._updatePresence, PRESENCE_UPDATE_INTERVAL);
Expand All @@ -559,6 +548,27 @@ class Hydra extends EventEmitter {
});
}

/**
* @name _onMessage
* @summary Handle pub/sub messages
* @private
* @param {string} channel - channel
* @param {string} messageId - UMF mid
*/
_onMessage(channel, messageId) {
this.redisdb.get(`${redisPreKey}:message:${messageId}`, (err, _result) => {
if (err) {
this._logMessage('error', err.message);
return;
}
let msg = Utils.safeJSONParse(_result);
if (msg) {
let umfMsg = UMFMessage.createMessage(msg);
this.emit('message', umfMsg.toShort());
}
});
}

/**
* @name _registerRoutes
* @summary Register routes
Expand Down Expand Up @@ -1357,7 +1367,18 @@ class Hydra extends EventEmitter {
if (messageChannel) {
let msg = UMFMessage.createMessage(message);
let strMessage = Utils.safeJSONStringify(msg.toShort());
messageChannel.publish(channel, strMessage);
const messageId = message.mid;
this.redisdb.setex(
`${redisPreKey}:message:${messageId}`,
MESSAGE_TTL,
strMessage,
(err, result) => {
if (err) {
throw err;
}
messageChannel.publish(channel, messageId);
}
);
}
}

Expand Down

0 comments on commit fa328ce

Please sign in to comment.