Skip to content

Commit

Permalink
redis snapshots are retrieved recursively to match 'max revision' opt…
Browse files Browse the repository at this point in the history
…ion (#89)

snapshots where loaded all together to only use the latest with revMax === -1, causing performance issues
now snapshots are loaded from redis recusively to find the one that match revMax option
  • Loading branch information
rehia authored and adrai committed Nov 2, 2016
1 parent 6680e02 commit c0befc9
Showing 1 changed file with 24 additions and 27 deletions.
51 changes: 24 additions & 27 deletions lib/databases/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ function Redis(options) {
util.inherits(Redis, Store);

// helpers
function handleResultSet (err, res, callback) {
function handleResultSet(err, res, callback) {
if (err) {
debug(err);
return callback(err);
Expand All @@ -63,7 +63,7 @@ function handleResultSet (err, res, callback) {
}
var arr = [];

res.forEach(function(item) {
res.forEach(function (item) {
arr.push(jsondate.parse(item));
});

Expand Down Expand Up @@ -145,7 +145,7 @@ _.extend(Redis.prototype, {
this.heartbeatInterval = setInterval(function () {
var graceTimer = setTimeout(function () {
if (self.heartbeatInterval) {
console.error((new Error ('Heartbeat timeouted after ' + gracePeriod + 'ms (redis)')).stack);
console.error((new Error('Heartbeat timeouted after ' + gracePeriod + 'ms (redis)')).stack);
self.disconnect();
}
}, gracePeriod);
Expand Down Expand Up @@ -177,7 +177,7 @@ _.extend(Redis.prototype, {
self.client.del('nextItemId:' + self.options.prefix, callback);
},
function (callback) {
self.client.keys(self.options.prefix + ':*', function(err, keys) {
self.client.keys(self.options.prefix + ':*', function (err, keys) {
if (err) {
return callback(err);
}
Expand All @@ -194,8 +194,8 @@ _.extend(Redis.prototype, {
});
},

getNewId: function(callback) {
this.client.incr('nextItemId:' + this.options.prefix, function(err, id) {
getNewId: function (callback) {
this.client.incr('nextItemId:' + this.options.prefix, function (err, id) {
if (err) {
debug(err);
return callback(err);
Expand Down Expand Up @@ -251,13 +251,13 @@ _.extend(Redis.prototype, {
cursor = 0;
}

(function scanRecursive (curs) {
(function scanRecursive(curs) {
self.client.scan(curs, 'match', key, function (err, res) {
if (err) {
return callback(err);
}

function next () {
function next() {
if (res[0] === '0') {
callback(null);
} else {
Expand Down Expand Up @@ -532,7 +532,7 @@ _.extend(Redis.prototype, {
);
},

addSnapshot: function(snap, callback) {
addSnapshot: function (snap, callback) {
if (!snap.aggregateId) {
var errMsg = 'aggregateId not defined!';
debug(errMsg);
Expand Down Expand Up @@ -580,37 +580,34 @@ _.extend(Redis.prototype, {
return s;
}).reverse();

if (revMax > -1) {
allKeys = allKeys.slice(0, revMax);
if (revMax === -1) { // by default the last snapshot is kept
allKeys = allKeys.slice(0, 1);
}

if (allKeys.length === 0) {
return callback(null, null);
}

async.map(allKeys, function (key, callback) {
// iterating recursively over snapshots, from latest to oldest
(function nextSnapshot(key) {
self.client.get(key, function (err, res) {
if (err) {
debug(err);
return callback(err);
}

callback(null, jsondate.parse(res));
});
}, function (err, res) {
if (err) {
debug(err);
return callback(err);
}

var found = _.find(res, function (s) {
if (revMax > -1 && s.revision > revMax) {
return false;
var snapshot = jsondate.parse(res);
if (revMax > -1 && snapshot.revision > revMax) {
if (allKeys.length > 0) {
nextSnapshot(allKeys.shift());
} else {
callback(null, null);
}
} else {
callback(null, snapshot);
}
return true;
});

callback(null, found);
});
})(allKeys.shift());
}
);
}
Expand Down

0 comments on commit c0befc9

Please sign in to comment.