diff --git a/API.md b/API.md
index c8b54c47..23066791 100644
--- a/API.md
+++ b/API.md
@@ -221,6 +221,7 @@ Creates a Redis Cluster instance
| [options.maxRedirections] | number
| 16
| When a MOVED or ASK error is received, client will redirect the command to another node. This option limits the max redirections allowed to send a command. |
| [options.retryDelayOnFailover] | number
| 100
| When an error is received when sending a command(e.g. "Connection is closed." when the target Redis node is down), |
| [options.retryDelayOnClusterDown] | number
| 100
| When a CLUSTERDOWN error is received, client will retry if `retryDelayOnClusterDown` is valid delay time. |
+| [options.retryDelayOnTryAgain] | number
| 100
| When a TRYAGAIN error is received, client will retry if `retryDelayOnTryAgain` is valid delay time. |
| [options.redisOptions] | Object
| | Passed to the constructor of `Redis`. |
diff --git a/README.md b/README.md
index 7129fc92..7bcd413d 100644
--- a/README.md
+++ b/README.md
@@ -678,6 +678,8 @@ but a few so that if one is unreachable the client will try the next one, and th
to insure that no command will fail during a failover.
* `retryDelayOnClusterDown`: When a cluster is down, all commands will be rejected with the error of `CLUSTERDOWN`. If this option is a number (by default, it is `100`), the client
will resend the commands after the specified time (in ms).
+ * `retryDelayOnTryAgain`: If this option is a number (by default, it is `100`), the client
+ will resend the commands rejected with `TRYAGAIN` error after the specified time (in ms).
* `redisOptions`: Default options passed to the constructor of `Redis` when connecting to a node.
### Read-write splitting
diff --git a/lib/cluster/delay_queue.js b/lib/cluster/delay_queue.js
new file mode 100644
index 00000000..77249c96
--- /dev/null
+++ b/lib/cluster/delay_queue.js
@@ -0,0 +1,48 @@
+'use strict';
+
+var Deque = require('double-ended-queue');
+var debug = require('debug')('ioredis:delayqueue');
+
+function DelayQueue() {
+ this.queues = {};
+ this.timeouts = {};
+}
+
+DelayQueue.prototype.push = function (bucket, item, options) {
+ var callback = options.callback || process.nextTick;
+ if (!this.queues[bucket]) {
+ this.queues[bucket] = new Deque();
+ }
+
+ var queue = this.queues[bucket];
+ queue.push(item);
+
+ if (!this.timeouts[bucket]) {
+ var _this = this;
+ this.timeouts[bucket] = setTimeout(function () {
+ callback(function () {
+ _this.timeouts[bucket] = null;
+ _this._execute(bucket);
+ });
+ }, options.timeout);
+ }
+};
+
+DelayQueue.prototype._execute = function (bucket) {
+ var queue = this.queues[bucket];
+ if (!queue) {
+ return;
+ }
+ var length = queue.length;
+ if (!length) {
+ return;
+ }
+ debug('send %d commands in %s queue', length, bucket);
+
+ this.queues[bucket] = null;
+ while (queue.length > 0) {
+ queue.shift()();
+ }
+};
+
+module.exports = DelayQueue;
diff --git a/lib/cluster/index.js b/lib/cluster/index.js
index 247d7202..dba62b53 100644
--- a/lib/cluster/index.js
+++ b/lib/cluster/index.js
@@ -13,6 +13,7 @@ var Commander = require('../commander');
var Command = require('../command');
var commands = require('redis-commands');
var ConnectionPool = require('./connection_pool');
+var DelayQueue = require('./delay_queue');
/**
* Creates a Redis Cluster instance
@@ -32,6 +33,8 @@ var ConnectionPool = require('./connection_pool');
* "Connection is closed." when the target Redis node is down),
* @param {number} [options.retryDelayOnClusterDown=100] - When a CLUSTERDOWN error is received, client will retry
* if `retryDelayOnClusterDown` is valid delay time.
+ * @param {number} [options.retryDelayOnTryAgain=100] - When a TRYAGAIN error is received, client will retry
+ * if `retryDelayOnTryAgain` is valid delay time.
* @param {Object} [options.redisOptions] - Passed to the constructor of `Redis`.
* @extends [EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter)
* @extends Commander
@@ -70,8 +73,7 @@ function Cluster(startupNodes, options) {
this.retryAttempts = 0;
this.resetOfflineQueue();
- this.resetFailoverQueue();
- this.resetClusterDownQueue();
+ this.delayQueue = new DelayQueue();
this.subscriber = null;
@@ -93,7 +95,8 @@ Cluster.defaultOptions = {
scaleReads: 'master',
maxRedirections: 16,
retryDelayOnFailover: 100,
- retryDelayOnClusterDown: 100
+ retryDelayOnClusterDown: 100,
+ retryDelayOnTryAgain: 100
};
util.inherits(Cluster, EventEmitter);
@@ -103,14 +106,6 @@ Cluster.prototype.resetOfflineQueue = function () {
this.offlineQueue = new Deque();
};
-Cluster.prototype.resetFailoverQueue = function () {
- this.failoverQueue = new Deque();
-};
-
-Cluster.prototype.resetClusterDownQueue = function () {
- this.clusterDownQueue = new Deque();
-};
-
/**
* Connect to a cluster
*
@@ -365,30 +360,6 @@ Cluster.prototype.executeOfflineCommands = function () {
}
};
-Cluster.prototype.executeFailoverCommands = function () {
- if (this.failoverQueue.length) {
- debug('send %d commands in failover queue', this.failoverQueue.length);
- var failoverQueue = this.failoverQueue;
- this.resetFailoverQueue();
- while (failoverQueue.length > 0) {
- var item = failoverQueue.shift();
- item();
- }
- }
-};
-
-Cluster.prototype.executeClusterDownCommands = function () {
- if (this.clusterDownQueue.length) {
- debug('send %d commands in cluster down queue', this.clusterDownQueue.length);
- var clusterDownQueue = this.clusterDownQueue;
- this.resetClusterDownQueue();
- while (clusterDownQueue.length > 0) {
- var item = clusterDownQueue.shift();
- item();
- }
- }
-};
-
Cluster.prototype.sendCommand = function (command, stream, node) {
if (this.status === 'end') {
command.reject(new Error('Connection is closed.'));
@@ -427,6 +398,7 @@ Cluster.prototype.sendCommand = function (command, stream, node) {
debug('command %s is required to ask %s:%s', command.name, key);
tryConnection(false, key);
},
+ tryagain: partialTry,
clusterDown: partialTry,
connectionClosed: partialTry,
maxRedirections: function (redirectionError) {
@@ -511,7 +483,6 @@ Cluster.prototype.sendCommand = function (command, stream, node) {
};
Cluster.prototype.handleError = function (error, ttl, handlers) {
- var _this = this;
if (typeof ttl.value === 'undefined') {
ttl.value = this.options.maxRedirections;
} else {
@@ -524,26 +495,20 @@ Cluster.prototype.handleError = function (error, ttl, handlers) {
var errv = error.message.split(' ');
if (errv[0] === 'MOVED' || errv[0] === 'ASK') {
handlers[errv[0] === 'MOVED' ? 'moved' : 'ask'](errv[1], errv[2]);
+ } else if (errv[0] === 'TRYAGAIN') {
+ this.delayQueue.push('tryagain', handlers.tryagain, {
+ timeout: this.options.retryDelayOnTryAgain
+ });
} else if (errv[0] === 'CLUSTERDOWN' && this.options.retryDelayOnClusterDown > 0) {
- this.clusterDownQueue.push(handlers.clusterDown);
- if (!this.clusterDownTimeout) {
- this.clusterDownTimeout = setTimeout(function () {
- _this.refreshSlotsCache(function () {
- _this.clusterDownTimeout = null;
- _this.executeClusterDownCommands();
- });
- }, this.options.retryDelayOnClusterDown);
- }
+ this.delayQueue.push('clusterdown', handlers.connectionClosed, {
+ timeout: this.options.retryDelayOnClusterDown,
+ callback: this.refreshSlotsCache.bind(this)
+ });
} else if (error.message === 'Connection is closed.' && this.options.retryDelayOnFailover > 0) {
- this.failoverQueue.push(handlers.connectionClosed);
- if (!this.failoverTimeout) {
- this.failoverTimeout = setTimeout(function () {
- _this.refreshSlotsCache(function () {
- _this.failoverTimeout = null;
- _this.executeFailoverCommands();
- });
- }, this.options.retryDelayOnFailover);
- }
+ this.delayQueue.push('failover', handlers.connectionClosed, {
+ timeout: this.options.retryDelayOnFailover,
+ callback: this.refreshSlotsCache.bind(this)
+ });
} else {
handlers.defaults();
}
diff --git a/lib/pipeline.js b/lib/pipeline.js
index 300096ce..10b4a46d 100644
--- a/lib/pipeline.js
+++ b/lib/pipeline.js
@@ -102,6 +102,9 @@ Pipeline.prototype.fillResult = function (value, position) {
if (typeof this.leftRedirections === 'undefined') {
this.leftRedirections = {};
}
+ var exec = function () {
+ _this.exec();
+ };
this.redis.handleError(commonError, this.leftRedirections, {
moved: function (slot, key) {
_this.preferKey = key;
@@ -113,12 +116,9 @@ Pipeline.prototype.fillResult = function (value, position) {
_this.preferKey = key;
_this.exec();
},
- clusterDown: function () {
- _this.exec();
- },
- connectionClosed: function () {
- _this.exec();
- },
+ tryagain: exec,
+ clusterDown: exec,
+ connectionClosed: exec,
maxRedirections: function () {
matched = false;
},
diff --git a/test/functional/cluster.js b/test/functional/cluster.js
index 3092f4b1..2d7acc5c 100644
--- a/test/functional/cluster.js
+++ b/test/functional/cluster.js
@@ -331,7 +331,7 @@ describe('cluster', function () {
var cluster = new Redis.Cluster([
{ host: '127.0.0.1', port: '30001' }
- ], { lazyConnect: false });
+ ]);
cluster.get('foo', function () {
cluster.get('foo');
});
@@ -361,7 +361,7 @@ describe('cluster', function () {
});
var cluster = new Redis.Cluster([
{ host: '127.0.0.1', port: '30001' }
- ], { lazyConnect: false });
+ ], { retryDelayOnFailover: 1 });
cluster.get('foo', function (err, res) {
expect(res).to.eql('bar');
cluster.disconnect();
@@ -453,6 +453,37 @@ describe('cluster', function () {
});
});
+ describe('TRYAGAIN', function () {
+ it('should retry the command', function (done) {
+ var times = 0;
+ var slotTable = [
+ [0, 16383, ['127.0.0.1', 30001]]
+ ];
+ var server = new MockServer(30001, function (argv) {
+ if (argv[0] === 'cluster' && argv[1] === 'slots') {
+ return slotTable;
+ }
+ if (argv[0] === 'get' && argv[1] === 'foo') {
+ if (times++ === 1) {
+ process.nextTick(function () {
+ cluster.disconnect();
+ disconnect([server], done);
+ });
+ } else {
+ return new Error('TRYAGAIN Multiple keys request during rehashing of slot');
+ }
+ }
+ });
+
+ var cluster = new Redis.Cluster([
+ { host: '127.0.0.1', port: '30001' }
+ ], { retryDelayOnTryAgain: 1 });
+ cluster.get('foo', function () {
+ cluster.get('foo');
+ });
+ });
+ });
+
describe('CLUSTERDOWN', function () {
it('should redirect the command to a random node', function (done) {
var slotTable = [
diff --git a/test/unit/cluster.js b/test/unit/cluster.js
index 770509e3..a0397c41 100644
--- a/test/unit/cluster.js
+++ b/test/unit/cluster.js
@@ -21,38 +21,4 @@ describe('cluster', function () {
expect(cluster.options).to.have.property('showFriendlyErrorStack', false);
expect(cluster.options).to.have.property('scaleReads', 'master');
});
-
- describe('#executeFailoverCommands', function () {
- it('should execute the commands', function (done) {
- var cluster = {
- resetFailoverQueue: function () {
- this.failoverQueue = [];
- },
- failoverQueue: []
- };
-
- cluster.failoverQueue.push(function () {
- expect(this.failoverQueue).to.have.length(0);
- done();
- }.bind(cluster));
- Cluster.prototype.executeFailoverCommands.call(cluster);
- });
- });
-
- describe('#executeClusterDownCommands', function () {
- it('should execute the commands', function (done) {
- var cluster = {
- resetClusterDownQueue: function () {
- this.clusterDownQueue = [];
- },
- clusterDownQueue: []
- };
-
- cluster.clusterDownQueue.push(function () {
- expect(this.clusterDownQueue).to.have.length(0);
- done();
- }.bind(cluster));
- Cluster.prototype.executeClusterDownCommands.call(cluster);
- });
- });
});