diff --git a/lib/databases/redis.js b/lib/databases/redis.js index 2c8672ba..5e016e67 100644 --- a/lib/databases/redis.js +++ b/lib/databases/redis.js @@ -52,7 +52,7 @@ function Redis(options) { util.inherits(Redis, Store); // helpers -function handleResultSet (err, res, callback) { +function handleResultSet(err, res, callback) { if (err) { debug(err); return callback(err); @@ -63,7 +63,7 @@ function handleResultSet (err, res, callback) { } var arr = []; - res.forEach(function(item) { + res.forEach(function (item) { arr.push(jsondate.parse(item)); }); @@ -145,7 +145,7 @@ _.extend(Redis.prototype, { this.heartbeatInterval = setInterval(function () { var graceTimer = setTimeout(function () { if (self.heartbeatInterval) { - console.error((new Error ('Heartbeat timeouted after ' + gracePeriod + 'ms (redis)')).stack); + console.error((new Error('Heartbeat timeouted after ' + gracePeriod + 'ms (redis)')).stack); self.disconnect(); } }, gracePeriod); @@ -177,7 +177,7 @@ _.extend(Redis.prototype, { self.client.del('nextItemId:' + self.options.prefix, callback); }, function (callback) { - self.client.keys(self.options.prefix + ':*', function(err, keys) { + self.client.keys(self.options.prefix + ':*', function (err, keys) { if (err) { return callback(err); } @@ -194,8 +194,8 @@ _.extend(Redis.prototype, { }); }, - getNewId: function(callback) { - this.client.incr('nextItemId:' + this.options.prefix, function(err, id) { + getNewId: function (callback) { + this.client.incr('nextItemId:' + this.options.prefix, function (err, id) { if (err) { debug(err); return callback(err); @@ -251,13 +251,13 @@ _.extend(Redis.prototype, { cursor = 0; } - (function scanRecursive (curs) { + (function scanRecursive(curs) { self.client.scan(curs, 'match', key, function (err, res) { if (err) { return callback(err); } - function next () { + function next() { if (res[0] === '0') { callback(null); } else { @@ -532,7 +532,7 @@ _.extend(Redis.prototype, { ); }, - addSnapshot: function(snap, callback) { + addSnapshot: function (snap, callback) { if (!snap.aggregateId) { var errMsg = 'aggregateId not defined!'; debug(errMsg); @@ -580,37 +580,34 @@ _.extend(Redis.prototype, { return s; }).reverse(); - if (revMax > -1) { - allKeys = allKeys.slice(0, revMax); + if (revMax === -1) { // by default the last snapshot is kept + allKeys = allKeys.slice(0, 1); } if (allKeys.length === 0) { return callback(null, null); } - async.map(allKeys, function (key, callback) { + // iterating recursively over snapshots, from latest to oldest + (function nextSnapshot(key) { self.client.get(key, function (err, res) { if (err) { + debug(err); return callback(err); } - callback(null, jsondate.parse(res)); - }); - }, function (err, res) { - if (err) { - debug(err); - return callback(err); - } - - var found = _.find(res, function (s) { - if (revMax > -1 && s.revision > revMax) { - return false; + var snapshot = jsondate.parse(res); + if (revMax > -1 && snapshot.revision > revMax) { + if (allKeys.length > 0) { + nextSnapshot(allKeys.shift()); + } else { + callback(null, null); + } + } else { + callback(null, snapshot); } - return true; }); - - callback(null, found); - }); + })(allKeys.shift()); } ); }