From 089ac3ecf8be3f739efadf6d3da2bb1a8bcae4d9 Mon Sep 17 00:00:00 2001 From: Christian d'Heureuse Date: Fri, 10 Mar 2017 03:32:45 +0100 Subject: [PATCH 01/13] Implements Request.pause() / resume() - v3 --- src/connection.js | 59 +++++- src/message-io.js | 10 ++ src/request.js | 28 ++- src/token/stream-parser.js | 8 + src/token/token-stream-parser.js | 22 +++ test/integration/pause-resume-test.js | 250 ++++++++++++++++++++++++++ 6 files changed, 372 insertions(+), 5 deletions(-) create mode 100644 test/integration/pause-resume-test.js diff --git a/src/connection.js b/src/connection.js index 277207c84..dc9395543 100644 --- a/src/connection.js +++ b/src/connection.js @@ -577,7 +577,9 @@ class Connection extends EventEmitter { if (this.config.options.rowCollectionOnDone) { this.request.rst.push(token.columns); } - return this.request.emit('row', token.columns); + if (!(this.state === this.STATE.SENT_ATTENTION && this.request.paused)) { + this.request.emit('row', token.columns); + } } else { this.emit('error', new Error("Received 'row' when no sqlRequest is in progress")); return this.close(); @@ -641,6 +643,12 @@ class Connection extends EventEmitter { } }); + this.tokenStreamParser.on('endOfMessage', () => { // EOM pseudo token received + if (this.state === this.STATE.SENT_CLIENT_REQUEST) { + this.dispatchEvent('endOfMessageMarkerReceived'); + } + }); + this.tokenStreamParser.on('resetConnection', () => { return this.emit('resetConnection'); }); @@ -649,6 +657,12 @@ class Connection extends EventEmitter { this.emit('error', error); return this.close(); }); + + this.tokenStreamParser.on('drain', () => { + // Bridge the release of backpressure from the token stream parser + // transform to the packet stream transform. + this.messageIo.resume(); + }); } connect() { @@ -973,10 +987,33 @@ class Connection extends EventEmitter { } } + // Returns false to apply backpressure. sendDataToTokenStreamParser(data) { return this.tokenStreamParser.addBuffer(data); } + // This is an internal method that is called from Request.pause(). + // It has to check whether the passed Request object represents the currently + // active request, because the application might have called Request.pause() + // on an old inactive Request object. + pauseRequest(request) { + if (this.isRequestActive(request)) { + this.tokenStreamParser.pause(); + } + } + + // This is an internal method that is called from Request.resume(). + resumeRequest(request) { + if (this.isRequestActive(request)) { + this.tokenStreamParser.resume(); + } + } + + // Returns true if the passed request is the currently active request of the connection. + isRequestActive(request) { + return request === this.request && this.state === this.STATE.SENT_CLIENT_REQUEST; + } + sendInitialSql() { const payload = new SqlBatchPayload(this.getInitialSql(), this.currentTransactionDescriptor(), this.config.options); return this.messageIo.sendMessage(TYPE.SQL_BATCH, payload.data); @@ -1266,6 +1303,7 @@ class Connection extends EventEmitter { } this.request = request; + this.request.connection = this; this.request.rowCount = 0; this.request.rows = []; this.request.rst = []; @@ -1275,7 +1313,10 @@ class Connection extends EventEmitter { this.debug.payload(function() { return payload.toString(' '); }); - return this.transitionTo(this.STATE.SENT_CLIENT_REQUEST); + this.transitionTo(this.STATE.SENT_CLIENT_REQUEST); + if (request.paused) { // Request.pause() has been called before the request was started + this.pauseRequest(request); + } } } @@ -1563,6 +1604,7 @@ Connection.prototype.STATE = { name: 'SentClientRequest', exit: function() { this.clearRequestTimer(); + this.tokenStreamParser.resume(); }, events: { socketError: function(err) { @@ -1573,9 +1615,20 @@ Connection.prototype.STATE = { }, data: function(data) { this.clearRequestTimer(); // request timer is stopped on first data package - return this.sendDataToTokenStreamParser(data); + const ret = this.sendDataToTokenStreamParser(data); + if (ret === false) { + // Bridge backpressure from the token stream parser transform to the + // packet stream transform. + this.messageIo.pause(); + } }, message: function() { + // We have to channel the 'message' (EOM) event through the token stream + // parser transform, to keep it in line with the flow of the tokens, when + // the incoming data flow is paused and resumed. + return this.tokenStreamParser.addEndOfMessageMarker(); + }, + endOfMessageMarkerReceived: function() { this.transitionTo(this.STATE.LOGGED_IN); const sqlRequest = this.request; this.request = undefined; diff --git a/src/message-io.js b/src/message-io.js index cad41c8c3..ce3a15ead 100644 --- a/src/message-io.js +++ b/src/message-io.js @@ -179,4 +179,14 @@ module.exports = class MessageIO extends EventEmitter { this.debug.packet(direction, packet); return this.debug.data(packet); } + + // Temporarily suspends the flow of incoming packets. + pause() { + this.packetStream.pause(); + } + + // Resumes the flow of incoming packets. + resume() { + this.packetStream.resume(); + } }; diff --git a/src/request.js b/src/request.js index 67dc5c841..21588ab93 100644 --- a/src/request.js +++ b/src/request.js @@ -7,10 +7,11 @@ module.exports = class Request extends EventEmitter { super(); this.sqlTextOrProcedure = sqlTextOrProcedure; - this.callback = callback; this.parameters = []; this.parametersByName = {}; - this.userCallback = this.callback; + this.canceled = false; + this.paused = false; + this.userCallback = callback; this.callback = function() { if (this.preparing) { this.emit('prepared'); @@ -134,4 +135,27 @@ module.exports = class Request extends EventEmitter { } return null; } + + // Temporarily suspends the flow of data from the database. + // No more 'row' events will be emitted until resume() is called. + pause() { + if (this.paused) { + return; + } + this.paused = true; + if (this.connection) { + this.connection.pauseRequest(this); + } + } + + // Resumes the flow of data from the database. + resume() { + if (!this.paused) { + return; + } + this.paused = false; + if (this.connection) { + this.connection.resumeRequest(this); + } + } }; diff --git a/src/token/stream-parser.js b/src/token/stream-parser.js index f68fde41c..d7280d806 100644 --- a/src/token/stream-parser.js +++ b/src/token/stream-parser.js @@ -24,6 +24,7 @@ module.exports = class Parser extends Transform { this.debug = debug; this.colMetadata = colMetadata; this.options = options; + this.endOfMessageMarker = {}; this.buffer = new Buffer(0); this.position = 0; @@ -33,6 +34,13 @@ module.exports = class Parser extends Transform { } _transform(input, encoding, done) { + if (input === this.endOfMessageMarker) { + done(null, { // generate endOfMessage pseudo token + name: 'EOM', + event: 'endOfMessage' + }); + return; + } if (this.position === this.buffer.length) { this.buffer = input; } else { diff --git a/src/token/token-stream-parser.js b/src/token/token-stream-parser.js index 859e7effb..6368f1bbf 100644 --- a/src/token/token-stream-parser.js +++ b/src/token/token-stream-parser.js @@ -25,14 +25,36 @@ class Parser extends EventEmitter { this.emit(token.event, token); } }); + this.parser.on('drain', () => { + this.emit('drain'); + }); } + // Returns false to apply backpressure. addBuffer(buffer) { return this.parser.write(buffer); } + // Writes an end-of-message (EOM) marker into the parser transform input + // queue. StreamParser will emit a 'data' event with an 'endOfMessage' + // pseudo token when the EOM marker has passed through the transform stream. + // Returns false to apply backpressure. + addEndOfMessageMarker() { + return this.parser.write(this.parser.endOfMessageMarker); + } + isEnd() { return this.parser.buffer.length === this.parser.position; } + + // Temporarily suspends the token stream parser transform from emitting events. + pause() { + this.parser.pause(); + } + + // Resumes the token stream parser transform. + resume() { + this.parser.resume(); + } } module.exports.Parser = Parser; diff --git a/test/integration/pause-resume-test.js b/test/integration/pause-resume-test.js new file mode 100644 index 000000000..55bcde129 --- /dev/null +++ b/test/integration/pause-resume-test.js @@ -0,0 +1,250 @@ +// This module contains tests cases for the Request.pause()/resume() methods. + +'use strict'; + +const Connection = require('../../src/connection'); +const Request = require('../../src/request'); +const fs = require('fs'); +const semver = require('semver'); + +function getConfig() { + const config = JSON.parse(fs.readFileSync(process.env.HOME + '/.tedious/test-connection.json', 'utf8')).config; + config.options.tdsVersion = process.env.TEDIOUS_TDS_VERSION; + return config; +} + +exports.setUp = function(setUpDone) { + const connection = new Connection(getConfig()); + connection.on('connect', (err) => { + if (err) { + setUpDone(err); + return; + } + this.connection = connection; + setUpDone(); + }); + connection.on('end', () => { + this.connection = undefined; + }); +}; + +exports.tearDown = function(tearDownDone) { + const connection = this.connection; + if (!connection) { + tearDownDone(); + return; + } + connection.on('end', function() { + tearDownDone(); + }); + connection.close(); +}; + +// This test reads a large number of rows from the database. +// At 1/4 of the rows, Request.pause() is called. +// After a delay, Request.resume() is called. +// This test verifies that: +// - No 'row' events are received during the pause. +// - The socket and the two transforms are stopped during the pause. +// - No large amounts of data are accumulated within the transforms during the pause. +// - No data is lost. +exports.testLargeQuery = function(test) { + const debugMode = false; + const totalRows = 200000; // total number of rows to read + const delayTime = 500; // pause delay time in ms + const connection = this.connection; + let request; + let rowsReceived = 0; + let failed = false; // used to suppress further error messages + let paused = false; + + connection.on('error', function(err) { + test.ifError(err); + }); + openRequest(); + + function openRequest() { + const sql = // recursive CTE to generate rows + 'with cte1 as ' + + '(select 1 as i union all select i + 1 from cte1 where i < ' + totalRows + ') ' + + 'select i from cte1 option (maxrecursion 0)'; + request = new Request(sql, onRequestCompletion); + request.on('row', processRow); + connection.execSql(request); + } + + function onRequestCompletion(err) { + test.ifError(err); + test.equal(rowsReceived, totalRows, 'Invalid row count.'); + test.done(); + } + + function processRow(columns) { + if (paused) { + fail('Row received in paused state.'); + } + rowsReceived++; + if (columns[0].value !== rowsReceived) { + fail('Invalid row counter value, value=' + columns[0].value + ', expected=' + rowsReceived + '.'); + return; + } + if (rowsReceived === Math.round(totalRows / 4)) { + pause(); + } + } + + function pause() { + if (debugMode) { + dumpStreamStates(connection); + console.log('Start pause.'); + } + paused = true; + request.pause(); + setTimeout(resume, delayTime); + } + + function resume() { + if (debugMode) { + console.log('End pause.'); + dumpStreamStates(connection); + } + verifyStreamStatesAfterPause(); + paused = false; + request.resume(); + } + + function verifyStreamStatesAfterPause() { + const packetSize = connection.messageIo.packetSize(); + const socketRs = connection.socket._readableState; + if (semver.gte(process.version, '0.12.18')) { + test.ok(!socketRs.flowing, + 'Socket is not paused.'); + } + const minimalSocketFillTestLevel = 0x4000; // (heuristic value) + const highWaterReserve = 512; // (heuristic value) + test.ok(socketRs.length >= Math.min(socketRs.highWaterMark - highWaterReserve, minimalSocketFillTestLevel), + 'Socket does not feel backpressure.'); + const packetTransformWs = connection.messageIo.packetStream._writableState; + const packetTransformRs = connection.messageIo.packetStream._readableState; + test.ok(!packetTransformRs.flowing, + 'Packet transform is not paused.'); + test.ok(packetTransformWs.length <= packetTransformWs.highWaterMark && + packetTransformRs.length <= packetTransformRs.highWaterMark, + 'Packet transform has large amount of data buffered.'); + const tokenTransformWs = connection.tokenStreamParser.parser._writableState; + const tokenTransformRs = connection.tokenStreamParser.parser._readableState; + test.ok(!tokenTransformRs.flowing, + 'Token transform is not paused.'); + test.ok(tokenTransformWs.length <= tokenTransformWs.highWaterMark, + 'Token transform input buffer overflow.'); + test.ok(tokenTransformRs.length < packetSize / 3, + 'Token transform output buffer has large amount of data buffered.'); + } + + function fail(msg) { + if (failed) { + return; + } + failed = true; + test.ok(false, msg); + connection.close(); + } +}; + +// This test reads only a few rows and makes a short pause after each row. +// This test verifies that: +// - Pause/resume works correctly when applied after the last packet of a TDS +// message has already been dispatched by MessageIO.ReadablePacketStream. +// This is the case when EOM / packet.isLast() has already been detected +// at the time when Request.pause() is called. +// The 'message' event emitted by MessageIO has to be channeled through +// the token parser transform. +// - No more 'row' events are emitted after a paused request has been canceled. +// - The internal data flow is resumed after a paused request has been canceled. +exports.testTransitions = function(test) { + const totalRequests = 3; + const rowsPerRequest = 4; + const delayTime = 100; // pause delay time in ms + const requestToCancel = 2; // 1-based position of request to be canceled + const rowToCancel = 2; // 1-based position of row at which connection.cancel() will be called + const connection = this.connection; + let request; + let requestCount = 0; + let rowCount; + let paused = false; + let canceled = false; + + connection.on('error', function(err) { + test.ifError(err); + }); + openRequest(); + + function openRequest() { + let sql = 'select 1'; + for (let i = 2; i <= rowsPerRequest; i++) { + sql = sql + ' union all select ' + i; + } + request = new Request(sql, onRequestCompletion); + request.on('row', processRow); + rowCount = 0; + paused = false; + canceled = false; + connection.execSql(request); + } + + function onRequestCompletion(err) { + requestCount++; + if (requestCount === requestToCancel) { + test.ok(err && err.code === 'ECANCEL'); + test.equal(rowCount, rowToCancel); + } else { + test.ifError(err); + test.equal(rowCount, rowsPerRequest); + } + if (requestCount < totalRequests) { + openRequest(); + } else { + test.done(); + } + } + + function processRow(columns) { + test.ok(!canceled, 'Row received in canceled state, requestCount=' + requestCount + ' rowCount=' + rowCount); + test.ok(!paused, 'Row received in paused state, requestCount=' + requestCount + ' rowCount=' + rowCount); + rowCount++; + test.equal(columns[0].value, rowCount); + paused = true; + request.pause(); + setTimeout(afterDelay, delayTime); + } + + function afterDelay() { + if (requestCount === requestToCancel - 1 && rowCount === rowToCancel) { + canceled = true; + connection.cancel(); + } else { + paused = false; + request.resume(); + } + } +}; + +function dumpStreamStates(connection) { + dumpStreamState('Socket', connection.socket); + dumpStreamState('Packet transform', connection.messageIo.packetStream); + dumpStreamState('Token transform', connection.tokenStreamParser.parser); +} + +function dumpStreamState(name, stream) { + console.log(); + console.log(name + ' state:'); + const ws = stream._writableState; + console.log(' ws.length: ' + ws.length); + console.log(' ws.bufferedRequestCount: ' + ws.bufferedRequestCount); + console.log(' ws.highWaterMark: ' + ws.highWaterMark); + const rs = stream._readableState; + console.log(' rs.length: ' + rs.length); + console.log(' rs.buffer.length: ' + rs.buffer.length); + console.log(' rs.highWaterMark: ' + rs.highWaterMark); + console.log(' rs.flowing: ' + rs.flowing); +} From 1c8597bb9f6fa285d7d921901fefdc3dd813e8a7 Mon Sep 17 00:00:00 2001 From: Christian d'Heureuse Date: Sat, 11 Mar 2017 02:11:00 +0100 Subject: [PATCH 02/13] extended test cases for pause/resume --- test/integration/pause-resume-test.js | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/test/integration/pause-resume-test.js b/test/integration/pause-resume-test.js index 55bcde129..164f9202c 100644 --- a/test/integration/pause-resume-test.js +++ b/test/integration/pause-resume-test.js @@ -10,6 +10,7 @@ const semver = require('semver'); function getConfig() { const config = JSON.parse(fs.readFileSync(process.env.HOME + '/.tedious/test-connection.json', 'utf8')).config; config.options.tdsVersion = process.env.TEDIOUS_TDS_VERSION; + config.options.requestTimeout = 250; // 250 ms timeout until the first response package is received return config; } @@ -48,6 +49,8 @@ exports.tearDown = function(tearDownDone) { // - The socket and the two transforms are stopped during the pause. // - No large amounts of data are accumulated within the transforms during the pause. // - No data is lost. +// - The request timer does not cancel the query when it takes longer to +// receive all the rows. exports.testLargeQuery = function(test) { const debugMode = false; const totalRows = 200000; // total number of rows to read @@ -161,12 +164,14 @@ exports.testLargeQuery = function(test) { // the token parser transform. // - No more 'row' events are emitted after a paused request has been canceled. // - The internal data flow is resumed after a paused request has been canceled. +// - A request can be paused before Connection.execSql() is called. exports.testTransitions = function(test) { - const totalRequests = 3; + const totalRequests = 4; const rowsPerRequest = 4; const delayTime = 100; // pause delay time in ms const requestToCancel = 2; // 1-based position of request to be canceled const rowToCancel = 2; // 1-based position of row at which connection.cancel() will be called + const requestToStartPaused = 4; // 1-based position of request to start paused const connection = this.connection; let request; let requestCount = 0; @@ -189,6 +194,9 @@ exports.testTransitions = function(test) { rowCount = 0; paused = false; canceled = false; + if (requestCount === requestToStartPaused - 1) { + pause(); + } connection.execSql(request); } @@ -213,6 +221,10 @@ exports.testTransitions = function(test) { test.ok(!paused, 'Row received in paused state, requestCount=' + requestCount + ' rowCount=' + rowCount); rowCount++; test.equal(columns[0].value, rowCount); + pause(); + } + + function pause() { paused = true; request.pause(); setTimeout(afterDelay, delayTime); From 562bb7be6d045ce2b9962163fb929246c9ce501a Mon Sep 17 00:00:00 2001 From: Christian d'Heureuse Date: Fri, 6 Oct 2017 03:52:14 +0200 Subject: [PATCH 03/13] Removed 'use strict' from pause-resume-test.js --- test/integration/pause-resume-test.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/integration/pause-resume-test.js b/test/integration/pause-resume-test.js index 164f9202c..0740e69fe 100644 --- a/test/integration/pause-resume-test.js +++ b/test/integration/pause-resume-test.js @@ -1,7 +1,5 @@ // This module contains tests cases for the Request.pause()/resume() methods. -'use strict'; - const Connection = require('../../src/connection'); const Request = require('../../src/request'); const fs = require('fs'); From b276316b69b41282f8aa94d1b0b7d9be83c9c5c3 Mon Sep 17 00:00:00 2001 From: Christian d'Heureuse Date: Tue, 10 Oct 2017 23:06:06 +0200 Subject: [PATCH 04/13] Increase delay time in pause/resume test to 1s, because Travis CI is slow --- test/integration/pause-resume-test.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/integration/pause-resume-test.js b/test/integration/pause-resume-test.js index 0740e69fe..8acf4cb19 100644 --- a/test/integration/pause-resume-test.js +++ b/test/integration/pause-resume-test.js @@ -52,7 +52,7 @@ exports.tearDown = function(tearDownDone) { exports.testLargeQuery = function(test) { const debugMode = false; const totalRows = 200000; // total number of rows to read - const delayTime = 500; // pause delay time in ms + const delayTime = 1000; // pause delay time in ms const connection = this.connection; let request; let rowsReceived = 0; @@ -121,7 +121,7 @@ exports.testLargeQuery = function(test) { test.ok(!socketRs.flowing, 'Socket is not paused.'); } - const minimalSocketFillTestLevel = 0x4000; // (heuristic value) + const minimalSocketFillTestLevel = 0x2000; // (heuristic value) const highWaterReserve = 512; // (heuristic value) test.ok(socketRs.length >= Math.min(socketRs.highWaterMark - highWaterReserve, minimalSocketFillTestLevel), 'Socket does not feel backpressure.'); From 27e72af989c33c0ff38ea3284912d30dd0261fe3 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Wed, 11 Oct 2017 17:32:55 -0700 Subject: [PATCH 05/13] test: Remove reliance on `semver`. As `tedious` only supports Node.JS 4.x and later, we don't need to check for 0.12.x or later in the tests. --- test/integration/pause-resume-test.js | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/test/integration/pause-resume-test.js b/test/integration/pause-resume-test.js index 8acf4cb19..55ee29d9c 100644 --- a/test/integration/pause-resume-test.js +++ b/test/integration/pause-resume-test.js @@ -1,9 +1,7 @@ -// This module contains tests cases for the Request.pause()/resume() methods. +const fs = require('fs'); const Connection = require('../../src/connection'); const Request = require('../../src/request'); -const fs = require('fs'); -const semver = require('semver'); function getConfig() { const config = JSON.parse(fs.readFileSync(process.env.HOME + '/.tedious/test-connection.json', 'utf8')).config; @@ -117,10 +115,8 @@ exports.testLargeQuery = function(test) { function verifyStreamStatesAfterPause() { const packetSize = connection.messageIo.packetSize(); const socketRs = connection.socket._readableState; - if (semver.gte(process.version, '0.12.18')) { - test.ok(!socketRs.flowing, - 'Socket is not paused.'); - } + test.ok(!socketRs.flowing, 'Socket is not paused.'); + const minimalSocketFillTestLevel = 0x2000; // (heuristic value) const highWaterReserve = 512; // (heuristic value) test.ok(socketRs.length >= Math.min(socketRs.highWaterMark - highWaterReserve, minimalSocketFillTestLevel), From 0d2b2cc9e823e659340eeb90e181c5fa1b64495b Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Wed, 11 Oct 2017 17:36:59 -0700 Subject: [PATCH 06/13] test: Simplify the pause-resume setup/teardown. --- test/integration/pause-resume-test.js | 32 +++++++-------------------- 1 file changed, 8 insertions(+), 24 deletions(-) diff --git a/test/integration/pause-resume-test.js b/test/integration/pause-resume-test.js index 55ee29d9c..817bce695 100644 --- a/test/integration/pause-resume-test.js +++ b/test/integration/pause-resume-test.js @@ -6,35 +6,19 @@ const Request = require('../../src/request'); function getConfig() { const config = JSON.parse(fs.readFileSync(process.env.HOME + '/.tedious/test-connection.json', 'utf8')).config; config.options.tdsVersion = process.env.TEDIOUS_TDS_VERSION; - config.options.requestTimeout = 250; // 250 ms timeout until the first response package is received + // 250 ms timeout until the first response package is received + config.options.requestTimeout = 250; return config; } -exports.setUp = function(setUpDone) { - const connection = new Connection(getConfig()); - connection.on('connect', (err) => { - if (err) { - setUpDone(err); - return; - } - this.connection = connection; - setUpDone(); - }); - connection.on('end', () => { - this.connection = undefined; - }); +exports.setUp = function(done) { + this.connection = new Connection(getConfig()); + this.connection.on('connect', done); }; -exports.tearDown = function(tearDownDone) { - const connection = this.connection; - if (!connection) { - tearDownDone(); - return; - } - connection.on('end', function() { - tearDownDone(); - }); - connection.close(); +exports.tearDown = function(done) { + this.connection.on('end', done); + this.connection.close(); }; // This test reads a large number of rows from the database. From e81686cb0531ba91150325e107ab0da52fc75e44 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Wed, 11 Oct 2017 17:56:52 -0700 Subject: [PATCH 07/13] test: Align the coding style of the pause-resume tests a bit. --- test/integration/pause-resume-test.js | 61 ++++++++++++--------------- 1 file changed, 28 insertions(+), 33 deletions(-) diff --git a/test/integration/pause-resume-test.js b/test/integration/pause-resume-test.js index 817bce695..a4e0367aa 100644 --- a/test/integration/pause-resume-test.js +++ b/test/integration/pause-resume-test.js @@ -33,28 +33,27 @@ exports.tearDown = function(done) { // receive all the rows. exports.testLargeQuery = function(test) { const debugMode = false; - const totalRows = 200000; // total number of rows to read - const delayTime = 1000; // pause delay time in ms + // total number of rows to read + const totalRows = 200000; + // pause delay time in ms + const delayTime = 1000; const connection = this.connection; - let request; let rowsReceived = 0; - let failed = false; // used to suppress further error messages let paused = false; connection.on('error', function(err) { test.ifError(err); }); - openRequest(); - function openRequest() { - const sql = // recursive CTE to generate rows - 'with cte1 as ' + - '(select 1 as i union all select i + 1 from cte1 where i < ' + totalRows + ') ' + - 'select i from cte1 option (maxrecursion 0)'; - request = new Request(sql, onRequestCompletion); - request.on('row', processRow); - connection.execSql(request); - } + // recursive CTE to generate rows + const sql = ` + with cte1 as + (select 1 as i union all select i + 1 from cte1 where i < ${totalRows}) + select i from cte1 option (maxrecursion 0) + `; + const request = new Request(sql, onRequestCompletion); + request.on('row', processRow); + connection.execSql(request); function onRequestCompletion(err) { test.ifError(err); @@ -64,13 +63,14 @@ exports.testLargeQuery = function(test) { function processRow(columns) { if (paused) { - fail('Row received in paused state.'); + test.ok(false, 'Row received in paused state.'); } rowsReceived++; + if (columns[0].value !== rowsReceived) { - fail('Invalid row counter value, value=' + columns[0].value + ', expected=' + rowsReceived + '.'); - return; + test.ok(false, `Invalid row counter value, value=${columns[0].value}, expected=${rowsReceived}.`); } + if (rowsReceived === Math.round(totalRows / 4)) { pause(); } @@ -121,15 +121,6 @@ exports.testLargeQuery = function(test) { test.ok(tokenTransformRs.length < packetSize / 3, 'Token transform output buffer has large amount of data buffered.'); } - - function fail(msg) { - if (failed) { - return; - } - failed = true; - test.ok(false, msg); - connection.close(); - } }; // This test reads only a few rows and makes a short pause after each row. @@ -146,10 +137,14 @@ exports.testLargeQuery = function(test) { exports.testTransitions = function(test) { const totalRequests = 4; const rowsPerRequest = 4; - const delayTime = 100; // pause delay time in ms - const requestToCancel = 2; // 1-based position of request to be canceled - const rowToCancel = 2; // 1-based position of row at which connection.cancel() will be called - const requestToStartPaused = 4; // 1-based position of request to start paused + // pause delay time in ms + const delayTime = 100; + // 1-based position of request to be canceled + const requestToCancel = 2; + // 1-based position of row at which connection.cancel() will be called + const rowToCancel = 2; + // 1-based position of request to start paused + const requestToStartPaused = 4; const connection = this.connection; let request; let requestCount = 0; @@ -165,7 +160,7 @@ exports.testTransitions = function(test) { function openRequest() { let sql = 'select 1'; for (let i = 2; i <= rowsPerRequest; i++) { - sql = sql + ' union all select ' + i; + sql = `${sql} union all select ${i}`; } request = new Request(sql, onRequestCompletion); request.on('row', processRow); @@ -195,8 +190,8 @@ exports.testTransitions = function(test) { } function processRow(columns) { - test.ok(!canceled, 'Row received in canceled state, requestCount=' + requestCount + ' rowCount=' + rowCount); - test.ok(!paused, 'Row received in paused state, requestCount=' + requestCount + ' rowCount=' + rowCount); + test.ok(!canceled, `Row received in canceled state, requestCount=${requestCount} rowCount=${rowCount}`); + test.ok(!paused, `Row received in paused state, requestCount=${requestCount} rowCount=${rowCount}`); rowCount++; test.equal(columns[0].value, rowCount); pause(); From b16bdeaee0449c316d0255297462279895c986db Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Wed, 11 Oct 2017 18:32:10 -0700 Subject: [PATCH 08/13] test: Break up complex test case into smaller test cases. --- test/integration/pause-resume-test.js | 146 ++++++++++++-------------- 1 file changed, 65 insertions(+), 81 deletions(-) diff --git a/test/integration/pause-resume-test.js b/test/integration/pause-resume-test.js index a4e0367aa..0c1409a66 100644 --- a/test/integration/pause-resume-test.js +++ b/test/integration/pause-resume-test.js @@ -123,95 +123,79 @@ exports.testLargeQuery = function(test) { } }; -// This test reads only a few rows and makes a short pause after each row. -// This test verifies that: -// - Pause/resume works correctly when applied after the last packet of a TDS -// message has already been dispatched by MessageIO.ReadablePacketStream. -// This is the case when EOM / packet.isLast() has already been detected -// at the time when Request.pause() is called. -// The 'message' event emitted by MessageIO has to be channeled through -// the token parser transform. -// - No more 'row' events are emitted after a paused request has been canceled. -// - The internal data flow is resumed after a paused request has been canceled. -// - A request can be paused before Connection.execSql() is called. -exports.testTransitions = function(test) { - const totalRequests = 4; - const rowsPerRequest = 4; - // pause delay time in ms - const delayTime = 100; - // 1-based position of request to be canceled - const requestToCancel = 2; - // 1-based position of row at which connection.cancel() will be called - const rowToCancel = 2; - // 1-based position of request to start paused - const requestToStartPaused = 4; - const connection = this.connection; - let request; - let requestCount = 0; - let rowCount; - let paused = false; - let canceled = false; +exports.testPausedRequestCanBeCancelled = function(test) { + this.connection.on('error', (err) => { + test.ifError(err); + }); - connection.on('error', function(err) { + const pauseAndCancelRequest = (next) => { + const sql = ` + with cte1 as + (select 1 as i union all select i + 1 from cte1 where i < 20000) + select i from cte1 option (maxrecursion 0) + `; + + const request = new Request(sql, (error) => { + test.ok(error); + test.strictEqual(error.code, 'ECANCEL'); + + next(); + }); + + request.on('row', (columns) => { + if (columns[0].value == 1000) { + request.pause(); + + setTimeout(() => { + this.connection.cancel(); + }, 100); + } else if (columns[0].value > 1000) { + test.ok(false, 'Received rows after pause'); + } + }); + + this.connection.execSql(request); + }; + + pauseAndCancelRequest(() => { + const request = new Request('SELECT 1', (error) => { + test.ifError(error); + test.done(); + }); + + request.on('row', (columns) => { + test.strictEqual(columns[0].value, 1); + }); + + this.connection.execSql(request); + }); +}; + +exports.testImmediatelyPausedRequestDoesNotEmitRowsUntilResumed = function(test) { + this.connection.on('error', (err) => { test.ifError(err); }); - openRequest(); - function openRequest() { - let sql = 'select 1'; - for (let i = 2; i <= rowsPerRequest; i++) { - sql = `${sql} union all select ${i}`; - } - request = new Request(sql, onRequestCompletion); - request.on('row', processRow); - rowCount = 0; - paused = false; - canceled = false; - if (requestCount === requestToStartPaused - 1) { - pause(); - } - connection.execSql(request); - } + const request = new Request('SELECT 1', (error) => { + test.ifError(error); + test.done(); + }); - function onRequestCompletion(err) { - requestCount++; - if (requestCount === requestToCancel) { - test.ok(err && err.code === 'ECANCEL'); - test.equal(rowCount, rowToCancel); - } else { - test.ifError(err); - test.equal(rowCount, rowsPerRequest); - } - if (requestCount < totalRequests) { - openRequest(); - } else { - test.done(); - } - } + let paused = true; + request.pause(); - function processRow(columns) { - test.ok(!canceled, `Row received in canceled state, requestCount=${requestCount} rowCount=${rowCount}`); - test.ok(!paused, `Row received in paused state, requestCount=${requestCount} rowCount=${rowCount}`); - rowCount++; - test.equal(columns[0].value, rowCount); - pause(); - } + request.on('row', (columns) => { + test.ok(!paused); - function pause() { - paused = true; - request.pause(); - setTimeout(afterDelay, delayTime); - } + test.strictEqual(columns[0].value, 1); + }); - function afterDelay() { - if (requestCount === requestToCancel - 1 && rowCount === rowToCancel) { - canceled = true; - connection.cancel(); - } else { - paused = false; - request.resume(); - } - } + this.connection.execSql(request); + + setTimeout(() => { + paused = false; + request.resume(); + }, 100); }; function dumpStreamStates(connection) { From 9a45d51b369b3feaf9d4a2af5db01230d23a5e51 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Wed, 11 Oct 2017 18:37:57 -0700 Subject: [PATCH 09/13] test: Move helper functions around. --- test/integration/pause-resume-test.js | 40 +++++++++++++-------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/test/integration/pause-resume-test.js b/test/integration/pause-resume-test.js index 0c1409a66..60e7c6ea8 100644 --- a/test/integration/pause-resume-test.js +++ b/test/integration/pause-resume-test.js @@ -21,6 +21,26 @@ exports.tearDown = function(done) { this.connection.close(); }; +function dumpStreamStates(connection) { + dumpStreamState('Socket', connection.socket); + dumpStreamState('Packet transform', connection.messageIo.packetStream); + dumpStreamState('Token transform', connection.tokenStreamParser.parser); +} + +function dumpStreamState(name, stream) { + console.log(); + console.log(name + ' state:'); + const ws = stream._writableState; + console.log(' ws.length: ' + ws.length); + console.log(' ws.bufferedRequestCount: ' + ws.bufferedRequestCount); + console.log(' ws.highWaterMark: ' + ws.highWaterMark); + const rs = stream._readableState; + console.log(' rs.length: ' + rs.length); + console.log(' rs.buffer.length: ' + rs.buffer.length); + console.log(' rs.highWaterMark: ' + rs.highWaterMark); + console.log(' rs.flowing: ' + rs.flowing); +} + // This test reads a large number of rows from the database. // At 1/4 of the rows, Request.pause() is called. // After a delay, Request.resume() is called. @@ -197,23 +217,3 @@ exports.testImmediatelyPausedRequestDoesNotEmitRowsUntilResumed = function(test) request.resume(); }, 100); }; - -function dumpStreamStates(connection) { - dumpStreamState('Socket', connection.socket); - dumpStreamState('Packet transform', connection.messageIo.packetStream); - dumpStreamState('Token transform', connection.tokenStreamParser.parser); -} - -function dumpStreamState(name, stream) { - console.log(); - console.log(name + ' state:'); - const ws = stream._writableState; - console.log(' ws.length: ' + ws.length); - console.log(' ws.bufferedRequestCount: ' + ws.bufferedRequestCount); - console.log(' ws.highWaterMark: ' + ws.highWaterMark); - const rs = stream._readableState; - console.log(' rs.length: ' + rs.length); - console.log(' rs.buffer.length: ' + rs.buffer.length); - console.log(' rs.highWaterMark: ' + rs.highWaterMark); - console.log(' rs.flowing: ' + rs.flowing); -} From c4bfb347264f4f181c6f1db5b1b3a7791e4c1b3d Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Thu, 12 Oct 2017 08:46:08 -0700 Subject: [PATCH 10/13] fix: Don't resume the token stream parser on connection close. --- src/connection.js | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/connection.js b/src/connection.js index dc9395543..44ab7b3a3 100644 --- a/src/connection.js +++ b/src/connection.js @@ -780,7 +780,7 @@ class Connection extends EventEmitter { } if (this.state && this.state.exit) { - this.state.exit.apply(this); + this.state.exit.call(this, newState); } this.debug.log('State change: ' + (this.state ? this.state.name : undefined) + ' -> ' + newState.name); @@ -1602,9 +1602,12 @@ Connection.prototype.STATE = { }, SENT_CLIENT_REQUEST: { name: 'SentClientRequest', - exit: function() { + exit: function(nextState) { this.clearRequestTimer(); - this.tokenStreamParser.resume(); + + if (nextState !== this.STATE.FINAL) { + this.tokenStreamParser.resume(); + } }, events: { socketError: function(err) { From bb7207f778355452db0e6c7c750b2fbf545ea8f4 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Thu, 12 Oct 2017 08:47:10 -0700 Subject: [PATCH 11/13] test: Further breakdown of test cases. Also, let's not access Node.js internal APIs inside the tests. --- test/integration/pause-resume-test.js | 185 ++++++++++++-------------- 1 file changed, 82 insertions(+), 103 deletions(-) diff --git a/test/integration/pause-resume-test.js b/test/integration/pause-resume-test.js index 60e7c6ea8..47af8e667 100644 --- a/test/integration/pause-resume-test.js +++ b/test/integration/pause-resume-test.js @@ -17,130 +17,109 @@ exports.setUp = function(done) { }; exports.tearDown = function(done) { - this.connection.on('end', done); - this.connection.close(); + if (this.connection.closed) { + done(); + } else { + this.connection.on('end', done); + this.connection.close(); + } }; -function dumpStreamStates(connection) { - dumpStreamState('Socket', connection.socket); - dumpStreamState('Packet transform', connection.messageIo.packetStream); - dumpStreamState('Token transform', connection.tokenStreamParser.parser); -} - -function dumpStreamState(name, stream) { - console.log(); - console.log(name + ' state:'); - const ws = stream._writableState; - console.log(' ws.length: ' + ws.length); - console.log(' ws.bufferedRequestCount: ' + ws.bufferedRequestCount); - console.log(' ws.highWaterMark: ' + ws.highWaterMark); - const rs = stream._readableState; - console.log(' rs.length: ' + rs.length); - console.log(' rs.buffer.length: ' + rs.buffer.length); - console.log(' rs.highWaterMark: ' + rs.highWaterMark); - console.log(' rs.flowing: ' + rs.flowing); -} +exports.testPausedRequestDoesNotEmitRowsAfterConnectionClose = function(test) { + const sql = ` + with cte1 as + (select 1 as i union all select i + 1 from cte1 where i < 20000) + select i from cte1 option (maxrecursion 0) + `; -// This test reads a large number of rows from the database. -// At 1/4 of the rows, Request.pause() is called. -// After a delay, Request.resume() is called. -// This test verifies that: -// - No 'row' events are received during the pause. -// - The socket and the two transforms are stopped during the pause. -// - No large amounts of data are accumulated within the transforms during the pause. -// - No data is lost. -// - The request timer does not cancel the query when it takes longer to -// receive all the rows. -exports.testLargeQuery = function(test) { - const debugMode = false; - // total number of rows to read - const totalRows = 200000; - // pause delay time in ms - const delayTime = 1000; - const connection = this.connection; - let rowsReceived = 0; - let paused = false; + const request = new Request(sql, (error) => { + test.ok(error); + }); - connection.on('error', function(err) { - test.ifError(err); + request.on('row', (columns) => { + if (columns[0].value == 1000) { + request.pause(); + + setTimeout(() => { + this.connection.on('end', () => { + process.nextTick(() => { + test.done() + }); + }); + this.connection.close(); + }, 100); + } }); - // recursive CTE to generate rows + this.connection.execSql(request); +}; + +exports.testPausedRequestCanBeResumed = function(test) { const sql = ` with cte1 as - (select 1 as i union all select i + 1 from cte1 where i < ${totalRows}) + (select 1 as i union all select i + 1 from cte1 where i < 20000) select i from cte1 option (maxrecursion 0) `; - const request = new Request(sql, onRequestCompletion); - request.on('row', processRow); - connection.execSql(request); - function onRequestCompletion(err) { - test.ifError(err); - test.equal(rowsReceived, totalRows, 'Invalid row count.'); + let rowsReceived = 0; + let paused = false; + + const request = new Request(sql, (error) => { + test.ifError(error); + + test.strictEqual(rowsReceived, 20000); + test.done(); - } + }); + + request.on('row', (columns) => { + test.ok(!paused); - function processRow(columns) { - if (paused) { - test.ok(false, 'Row received in paused state.'); - } rowsReceived++; - if (columns[0].value !== rowsReceived) { - test.ok(false, `Invalid row counter value, value=${columns[0].value}, expected=${rowsReceived}.`); - } + test.strictEqual(columns[0].value, rowsReceived); - if (rowsReceived === Math.round(totalRows / 4)) { - pause(); - } - } + if (columns[0].value == 1000) { + paused = true; + request.pause(); - function pause() { - if (debugMode) { - dumpStreamStates(connection); - console.log('Start pause.'); + setTimeout(() => { + paused = false; + request.resume(); + }, 100); } - paused = true; - request.pause(); - setTimeout(resume, delayTime); - } + }); + + this.connection.execSql(request); +}; + +exports.testPausingRequestPausesTransforms = function(test) { + const sql = ` + with cte1 as + (select 1 as i union all select i + 1 from cte1 where i < 20000) + select i from cte1 option (maxrecursion 0) + `; - function resume() { - if (debugMode) { - console.log('End pause.'); - dumpStreamStates(connection); + const request = new Request(sql, (error) => { + test.ifError(error); + + test.done(); + }); + + request.on('row', (columns) => { + if (columns[0].value == 1000) { + request.pause(); + + setTimeout(() => { + test.ok(this.connection.messageIo.packetStream.isPaused()); + test.ok(this.connection.tokenStreamParser.parser.isPaused()); + + request.resume(); + }, 100); } - verifyStreamStatesAfterPause(); - paused = false; - request.resume(); - } + }); - function verifyStreamStatesAfterPause() { - const packetSize = connection.messageIo.packetSize(); - const socketRs = connection.socket._readableState; - test.ok(!socketRs.flowing, 'Socket is not paused.'); - - const minimalSocketFillTestLevel = 0x2000; // (heuristic value) - const highWaterReserve = 512; // (heuristic value) - test.ok(socketRs.length >= Math.min(socketRs.highWaterMark - highWaterReserve, minimalSocketFillTestLevel), - 'Socket does not feel backpressure.'); - const packetTransformWs = connection.messageIo.packetStream._writableState; - const packetTransformRs = connection.messageIo.packetStream._readableState; - test.ok(!packetTransformRs.flowing, - 'Packet transform is not paused.'); - test.ok(packetTransformWs.length <= packetTransformWs.highWaterMark && - packetTransformRs.length <= packetTransformRs.highWaterMark, - 'Packet transform has large amount of data buffered.'); - const tokenTransformWs = connection.tokenStreamParser.parser._writableState; - const tokenTransformRs = connection.tokenStreamParser.parser._readableState; - test.ok(!tokenTransformRs.flowing, - 'Token transform is not paused.'); - test.ok(tokenTransformWs.length <= tokenTransformWs.highWaterMark, - 'Token transform input buffer overflow.'); - test.ok(tokenTransformRs.length < packetSize / 3, - 'Token transform output buffer has large amount of data buffered.'); - } + this.connection.execSql(request); }; exports.testPausedRequestCanBeCancelled = function(test) { From 1747b08e3eea045ba262887f8871e62840d271d8 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Thu, 12 Oct 2017 08:53:28 -0700 Subject: [PATCH 12/13] fix: Add missing semicolon. --- test/integration/pause-resume-test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/pause-resume-test.js b/test/integration/pause-resume-test.js index 47af8e667..28d22d6bd 100644 --- a/test/integration/pause-resume-test.js +++ b/test/integration/pause-resume-test.js @@ -43,7 +43,7 @@ exports.testPausedRequestDoesNotEmitRowsAfterConnectionClose = function(test) { setTimeout(() => { this.connection.on('end', () => { process.nextTick(() => { - test.done() + test.done(); }); }); this.connection.close(); From a493920e4e83f92e0c2abe6a8182113c1df9a8c7 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Thu, 12 Oct 2017 10:27:06 -0700 Subject: [PATCH 13/13] test: Try increasing the delay. --- test/integration/pause-resume-test.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/integration/pause-resume-test.js b/test/integration/pause-resume-test.js index 28d22d6bd..2401f3a79 100644 --- a/test/integration/pause-resume-test.js +++ b/test/integration/pause-resume-test.js @@ -47,7 +47,7 @@ exports.testPausedRequestDoesNotEmitRowsAfterConnectionClose = function(test) { }); }); this.connection.close(); - }, 100); + }, 200); } }); @@ -86,7 +86,7 @@ exports.testPausedRequestCanBeResumed = function(test) { setTimeout(() => { paused = false; request.resume(); - }, 100); + }, 200); } }); @@ -115,7 +115,7 @@ exports.testPausingRequestPausesTransforms = function(test) { test.ok(this.connection.tokenStreamParser.parser.isPaused()); request.resume(); - }, 100); + }, 200); } }); @@ -147,7 +147,7 @@ exports.testPausedRequestCanBeCancelled = function(test) { setTimeout(() => { this.connection.cancel(); - }, 100); + }, 200); } else if (columns[0].value > 1000) { test.ok(false, 'Received rows after pause'); } @@ -194,5 +194,5 @@ exports.testImmediatelyPausedRequestDoesNotEmitRowsUntilResumed = function(test) setTimeout(() => { paused = false; request.resume(); - }, 100); + }, 200); };