From e0b188363dd561478c7bbf17245c17cec1950095 Mon Sep 17 00:00:00 2001 From: luin Date: Sun, 13 Mar 2016 14:49:33 +0800 Subject: [PATCH] feat(transaction): transform replies of transactions BREAKING CHANGE: 1. Reply transformers is supported inside transactions. 2. `Pipeline#execBuffer()` is deprecated. Use `Pipeline#exec()` instead. Closes #158. --- lib/command.js | 30 ++++++++++++++-------- lib/pipeline.js | 24 ++++++++++++++---- test/functional/transaction.js | 46 +++++++++++++++++++++++++++------- 3 files changed, 75 insertions(+), 25 deletions(-) diff --git a/lib/command.js b/lib/command.js index ad4977c2..8ef9d95a 100644 --- a/lib/command.js +++ b/lib/command.js @@ -200,18 +200,8 @@ Command.prototype.stringifyArguments = function () { Command.prototype._convertValue = function (resolve) { var _this = this; return function (value) { - // Convert buffer/buffer[] to string/string[] - var result = value; - var transformer; try { - if (_this.replyEncoding) { - result = utils.convertBufferToString(value, _this.replyEncoding); - } - transformer = Command._transformer.reply[_this.name]; - if (transformer) { - result = transformer(result); - } - resolve(result); + resolve(_this.transformReply(value)); } catch (err) { _this.reject(err); } @@ -219,6 +209,24 @@ Command.prototype._convertValue = function (resolve) { }; }; +/** + * Convert buffer/buffer[] to string/string[], + * and apply reply transformer. + * + * @public + */ +Command.prototype.transformReply = function (result) { + if (this.replyEncoding) { + result = utils.convertBufferToString(result, this.replyEncoding); + } + var transformer = Command._transformer.reply[this.name]; + if (transformer) { + result = transformer(result); + } + + return result; +}; + Command.FLAGS = { // Commands that can be processed when Redis is loading data from disk VALID_WHEN_LOADING: ['info', 'auth', 'select', 'subscribe', 'unsubscribe', 'psubscribe', diff --git a/lib/pipeline.js b/lib/pipeline.js index 10b4a46d..436775e6 100644 --- a/lib/pipeline.js +++ b/lib/pipeline.js @@ -5,6 +5,7 @@ var Command = require('./command'); var fbuffer = require('flexbuffer'); var Promise = require('bluebird'); var utils = require('./utils'); +var util = require('util'); var commands = require('redis-commands'); function Pipeline(redis) { @@ -35,13 +36,27 @@ function Pipeline(redis) { _.assign(Pipeline.prototype, Commander.prototype); Pipeline.prototype.fillResult = function (value, position) { + var i; + if (this._queue[position].name === 'exec' && Array.isArray(value[1])) { + var execLength = value[1].length; + for (i = 0; i < execLength; i++) { + if (value[1][i] instanceof Error) { + continue; + } + var cmd = this._queue[position - (execLength - i)]; + try { + value[1][i] = cmd.transformReply(value[1][i]); + } catch (err) { + value[1][i] = err; + } + } + } this._result[position] = value; if (--this.replyPending) { return; } - var i; if (this.isCluster) { var retriable = true; var commonError; @@ -175,18 +190,17 @@ Pipeline.prototype.multi = function () { }; var execBuffer = Pipeline.prototype.execBuffer; -Pipeline.prototype.execBuffer = function () { +Pipeline.prototype.execBuffer = util.deprecate(function () { if (this._transactions > 0) { this._transactions -= 1; } return execBuffer.apply(this, arguments); -}; +}, 'Pipeline#execBuffer: Use Pipeline#exec instead'); -var exec = Pipeline.prototype.exec; Pipeline.prototype.exec = function (callback) { if (this._transactions > 0) { this._transactions -= 1; - return exec.apply(this, arguments); + return execBuffer.apply(this, arguments); } if (!this.nodeifiedPromise) { this.nodeifiedPromise = true; diff --git a/test/functional/transaction.js b/test/functional/transaction.js index f9a03a37..21e2f249 100644 --- a/test/functional/transaction.js +++ b/test/functional/transaction.js @@ -24,7 +24,7 @@ describe('transaction', function () { it('should handle compile-time errors correctly', function (done) { var redis = new Redis(); - redis.multi().set('foo').get('foo').exec(function (err, result) { + redis.multi().set('foo').get('foo').exec(function (err) { expect(err).to.be.instanceof(Error); expect(err.toString()).to.match(/Transaction discarded because of previous errors/); done(); @@ -69,14 +69,42 @@ describe('transaction', function () { }); }); - it('should support execBuffer', function (done) { - var redis = new Redis(); - redis.multi().set('foo', 'bar').get('foo').execBuffer(function (err, res) { - expect(res[0][1]).to.be.instanceof(Buffer); - expect(res[0][1].toString()).to.eql('OK'); - expect(res[1][1]).to.be.instanceof(Buffer); - expect(res[1][1].toString()).to.eql('bar'); - done(); + describe('transformer', function () { + it('should trigger transformer', function (done) { + var redis = new Redis(); + var pending = 2; + var data = { name: 'Bob', age: '17' }; + redis.multi().hmset('foo', data).hgetall('foo', function (err, res) { + expect(res).to.eql('QUEUED'); + if (!--pending) { + done(); + } + }).hgetallBuffer('foo').exec(function (err, res) { + expect(res[0][1]).to.eql('OK'); + expect(res[1][1]).to.eql(data); + expect(res[2][1]).to.eql({ + name: new Buffer('Bob'), + age: new Buffer('17') + }); + if (!--pending) { + done(); + } + }); + }); + + it('should trigger transformer inside pipeline', function (done) { + var redis = new Redis(); + var data = { name: 'Bob', age: '17' }; + redis.pipeline().hmset('foo', data).multi().typeBuffer('foo') + .hgetall('foo').exec().hgetall('foo').exec(function (err, res) { + expect(res[0][1]).to.eql('OK'); + expect(res[1][1]).to.eql('OK'); + expect(res[2][1]).to.eql(new Buffer('QUEUED')); + expect(res[3][1]).to.eql('QUEUED'); + expect(res[4][1]).to.eql([new Buffer('hash'), data]); + expect(res[5][1]).to.eql(data); + done(); + }); }); });