diff --git a/src/connection.js b/src/connection.js index 277207c84..44ab7b3a3 100644 --- a/src/connection.js +++ b/src/connection.js @@ -577,7 +577,9 @@ class Connection extends EventEmitter { if (this.config.options.rowCollectionOnDone) { this.request.rst.push(token.columns); } - return this.request.emit('row', token.columns); + if (!(this.state === this.STATE.SENT_ATTENTION && this.request.paused)) { + this.request.emit('row', token.columns); + } } else { this.emit('error', new Error("Received 'row' when no sqlRequest is in progress")); return this.close(); @@ -641,6 +643,12 @@ class Connection extends EventEmitter { } }); + this.tokenStreamParser.on('endOfMessage', () => { // EOM pseudo token received + if (this.state === this.STATE.SENT_CLIENT_REQUEST) { + this.dispatchEvent('endOfMessageMarkerReceived'); + } + }); + this.tokenStreamParser.on('resetConnection', () => { return this.emit('resetConnection'); }); @@ -649,6 +657,12 @@ class Connection extends EventEmitter { this.emit('error', error); return this.close(); }); + + this.tokenStreamParser.on('drain', () => { + // Bridge the release of backpressure from the token stream parser + // transform to the packet stream transform. + this.messageIo.resume(); + }); } connect() { @@ -766,7 +780,7 @@ class Connection extends EventEmitter { } if (this.state && this.state.exit) { - this.state.exit.apply(this); + this.state.exit.call(this, newState); } this.debug.log('State change: ' + (this.state ? this.state.name : undefined) + ' -> ' + newState.name); @@ -973,10 +987,33 @@ class Connection extends EventEmitter { } } + // Returns false to apply backpressure. sendDataToTokenStreamParser(data) { return this.tokenStreamParser.addBuffer(data); } + // This is an internal method that is called from Request.pause(). + // It has to check whether the passed Request object represents the currently + // active request, because the application might have called Request.pause() + // on an old inactive Request object. + pauseRequest(request) { + if (this.isRequestActive(request)) { + this.tokenStreamParser.pause(); + } + } + + // This is an internal method that is called from Request.resume(). + resumeRequest(request) { + if (this.isRequestActive(request)) { + this.tokenStreamParser.resume(); + } + } + + // Returns true if the passed request is the currently active request of the connection. + isRequestActive(request) { + return request === this.request && this.state === this.STATE.SENT_CLIENT_REQUEST; + } + sendInitialSql() { const payload = new SqlBatchPayload(this.getInitialSql(), this.currentTransactionDescriptor(), this.config.options); return this.messageIo.sendMessage(TYPE.SQL_BATCH, payload.data); @@ -1266,6 +1303,7 @@ class Connection extends EventEmitter { } this.request = request; + this.request.connection = this; this.request.rowCount = 0; this.request.rows = []; this.request.rst = []; @@ -1275,7 +1313,10 @@ class Connection extends EventEmitter { this.debug.payload(function() { return payload.toString(' '); }); - return this.transitionTo(this.STATE.SENT_CLIENT_REQUEST); + this.transitionTo(this.STATE.SENT_CLIENT_REQUEST); + if (request.paused) { // Request.pause() has been called before the request was started + this.pauseRequest(request); + } } } @@ -1561,8 +1602,12 @@ Connection.prototype.STATE = { }, SENT_CLIENT_REQUEST: { name: 'SentClientRequest', - exit: function() { + exit: function(nextState) { this.clearRequestTimer(); + + if (nextState !== this.STATE.FINAL) { + this.tokenStreamParser.resume(); + } }, events: { socketError: function(err) { @@ -1573,9 +1618,20 @@ Connection.prototype.STATE = { }, data: function(data) { this.clearRequestTimer(); // request timer is stopped on first data package - return this.sendDataToTokenStreamParser(data); + const ret = this.sendDataToTokenStreamParser(data); + if (ret === false) { + // Bridge backpressure from the token stream parser transform to the + // packet stream transform. + this.messageIo.pause(); + } }, message: function() { + // We have to channel the 'message' (EOM) event through the token stream + // parser transform, to keep it in line with the flow of the tokens, when + // the incoming data flow is paused and resumed. + return this.tokenStreamParser.addEndOfMessageMarker(); + }, + endOfMessageMarkerReceived: function() { this.transitionTo(this.STATE.LOGGED_IN); const sqlRequest = this.request; this.request = undefined; diff --git a/src/message-io.js b/src/message-io.js index cad41c8c3..ce3a15ead 100644 --- a/src/message-io.js +++ b/src/message-io.js @@ -179,4 +179,14 @@ module.exports = class MessageIO extends EventEmitter { this.debug.packet(direction, packet); return this.debug.data(packet); } + + // Temporarily suspends the flow of incoming packets. + pause() { + this.packetStream.pause(); + } + + // Resumes the flow of incoming packets. + resume() { + this.packetStream.resume(); + } }; diff --git a/src/request.js b/src/request.js index 67dc5c841..21588ab93 100644 --- a/src/request.js +++ b/src/request.js @@ -7,10 +7,11 @@ module.exports = class Request extends EventEmitter { super(); this.sqlTextOrProcedure = sqlTextOrProcedure; - this.callback = callback; this.parameters = []; this.parametersByName = {}; - this.userCallback = this.callback; + this.canceled = false; + this.paused = false; + this.userCallback = callback; this.callback = function() { if (this.preparing) { this.emit('prepared'); @@ -134,4 +135,27 @@ module.exports = class Request extends EventEmitter { } return null; } + + // Temporarily suspends the flow of data from the database. + // No more 'row' events will be emitted until resume() is called. + pause() { + if (this.paused) { + return; + } + this.paused = true; + if (this.connection) { + this.connection.pauseRequest(this); + } + } + + // Resumes the flow of data from the database. + resume() { + if (!this.paused) { + return; + } + this.paused = false; + if (this.connection) { + this.connection.resumeRequest(this); + } + } }; diff --git a/src/token/stream-parser.js b/src/token/stream-parser.js index f68fde41c..d7280d806 100644 --- a/src/token/stream-parser.js +++ b/src/token/stream-parser.js @@ -24,6 +24,7 @@ module.exports = class Parser extends Transform { this.debug = debug; this.colMetadata = colMetadata; this.options = options; + this.endOfMessageMarker = {}; this.buffer = new Buffer(0); this.position = 0; @@ -33,6 +34,13 @@ module.exports = class Parser extends Transform { } _transform(input, encoding, done) { + if (input === this.endOfMessageMarker) { + done(null, { // generate endOfMessage pseudo token + name: 'EOM', + event: 'endOfMessage' + }); + return; + } if (this.position === this.buffer.length) { this.buffer = input; } else { diff --git a/src/token/token-stream-parser.js b/src/token/token-stream-parser.js index 859e7effb..6368f1bbf 100644 --- a/src/token/token-stream-parser.js +++ b/src/token/token-stream-parser.js @@ -25,14 +25,36 @@ class Parser extends EventEmitter { this.emit(token.event, token); } }); + this.parser.on('drain', () => { + this.emit('drain'); + }); } + // Returns false to apply backpressure. addBuffer(buffer) { return this.parser.write(buffer); } + // Writes an end-of-message (EOM) marker into the parser transform input + // queue. StreamParser will emit a 'data' event with an 'endOfMessage' + // pseudo token when the EOM marker has passed through the transform stream. + // Returns false to apply backpressure. + addEndOfMessageMarker() { + return this.parser.write(this.parser.endOfMessageMarker); + } + isEnd() { return this.parser.buffer.length === this.parser.position; } + + // Temporarily suspends the token stream parser transform from emitting events. + pause() { + this.parser.pause(); + } + + // Resumes the token stream parser transform. + resume() { + this.parser.resume(); + } } module.exports.Parser = Parser; diff --git a/test/integration/pause-resume-test.js b/test/integration/pause-resume-test.js new file mode 100644 index 000000000..2401f3a79 --- /dev/null +++ b/test/integration/pause-resume-test.js @@ -0,0 +1,198 @@ +const fs = require('fs'); + +const Connection = require('../../src/connection'); +const Request = require('../../src/request'); + +function getConfig() { + const config = JSON.parse(fs.readFileSync(process.env.HOME + '/.tedious/test-connection.json', 'utf8')).config; + config.options.tdsVersion = process.env.TEDIOUS_TDS_VERSION; + // 250 ms timeout until the first response package is received + config.options.requestTimeout = 250; + return config; +} + +exports.setUp = function(done) { + this.connection = new Connection(getConfig()); + this.connection.on('connect', done); +}; + +exports.tearDown = function(done) { + if (this.connection.closed) { + done(); + } else { + this.connection.on('end', done); + this.connection.close(); + } +}; + +exports.testPausedRequestDoesNotEmitRowsAfterConnectionClose = function(test) { + const sql = ` + with cte1 as + (select 1 as i union all select i + 1 from cte1 where i < 20000) + select i from cte1 option (maxrecursion 0) + `; + + const request = new Request(sql, (error) => { + test.ok(error); + }); + + request.on('row', (columns) => { + if (columns[0].value == 1000) { + request.pause(); + + setTimeout(() => { + this.connection.on('end', () => { + process.nextTick(() => { + test.done(); + }); + }); + this.connection.close(); + }, 200); + } + }); + + this.connection.execSql(request); +}; + +exports.testPausedRequestCanBeResumed = function(test) { + const sql = ` + with cte1 as + (select 1 as i union all select i + 1 from cte1 where i < 20000) + select i from cte1 option (maxrecursion 0) + `; + + let rowsReceived = 0; + let paused = false; + + const request = new Request(sql, (error) => { + test.ifError(error); + + test.strictEqual(rowsReceived, 20000); + + test.done(); + }); + + request.on('row', (columns) => { + test.ok(!paused); + + rowsReceived++; + + test.strictEqual(columns[0].value, rowsReceived); + + if (columns[0].value == 1000) { + paused = true; + request.pause(); + + setTimeout(() => { + paused = false; + request.resume(); + }, 200); + } + }); + + this.connection.execSql(request); +}; + +exports.testPausingRequestPausesTransforms = function(test) { + const sql = ` + with cte1 as + (select 1 as i union all select i + 1 from cte1 where i < 20000) + select i from cte1 option (maxrecursion 0) + `; + + const request = new Request(sql, (error) => { + test.ifError(error); + + test.done(); + }); + + request.on('row', (columns) => { + if (columns[0].value == 1000) { + request.pause(); + + setTimeout(() => { + test.ok(this.connection.messageIo.packetStream.isPaused()); + test.ok(this.connection.tokenStreamParser.parser.isPaused()); + + request.resume(); + }, 200); + } + }); + + this.connection.execSql(request); +}; + +exports.testPausedRequestCanBeCancelled = function(test) { + this.connection.on('error', (err) => { + test.ifError(err); + }); + + const pauseAndCancelRequest = (next) => { + const sql = ` + with cte1 as + (select 1 as i union all select i + 1 from cte1 where i < 20000) + select i from cte1 option (maxrecursion 0) + `; + + const request = new Request(sql, (error) => { + test.ok(error); + test.strictEqual(error.code, 'ECANCEL'); + + next(); + }); + + request.on('row', (columns) => { + if (columns[0].value == 1000) { + request.pause(); + + setTimeout(() => { + this.connection.cancel(); + }, 200); + } else if (columns[0].value > 1000) { + test.ok(false, 'Received rows after pause'); + } + }); + + this.connection.execSql(request); + }; + + pauseAndCancelRequest(() => { + const request = new Request('SELECT 1', (error) => { + test.ifError(error); + test.done(); + }); + + request.on('row', (columns) => { + test.strictEqual(columns[0].value, 1); + }); + + this.connection.execSql(request); + }); +}; + +exports.testImmediatelyPausedRequestDoesNotEmitRowsUntilResumed = function(test) { + this.connection.on('error', (err) => { + test.ifError(err); + }); + + const request = new Request('SELECT 1', (error) => { + test.ifError(error); + test.done(); + }); + + let paused = true; + request.pause(); + + request.on('row', (columns) => { + test.ok(!paused); + + test.strictEqual(columns[0].value, 1); + }); + + this.connection.execSql(request); + + setTimeout(() => { + paused = false; + request.resume(); + }, 200); +};