Skip to content

Commit

Permalink
Merge pull request #941 from dhensby/pulls/repeat-connect-calls
Browse files Browse the repository at this point in the history
Allow repeat calls to `ConnectionPool.connect()`
  • Loading branch information
dhensby authored Feb 10, 2020
2 parents 5f93f05 + 4d3a7a8 commit e7560dc
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 61 deletions.
85 changes: 85 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,14 @@ function runStoredProcedure() {
Awaiting or `.then`ing the pool creation is a safe way to ensure that the pool is always ready, without knowing where it
is needed first. In practice, once the pool is created then there will be no delay for the next operation.

As of v6.1.0 you can make repeat calls to `ConnectionPool.connect()` and `ConnectonPool.close()` without an error being
thrown, allowing for the safe use of `mssql.connect().then(...)` throughout your code as well as making multiple calls to
close when your application is shutting down.

The ability to call `connect()` repeatedly is intended to make pool management easier, however it is still recommended
to follow the example above where `connect()` is called once and using the original resolved connection promise.
Repeatedly calling `connect()` when running queries risks running into problems when `close()` is called on the pool.

**ES6 Tagged template literals**

```javascript
Expand All @@ -494,6 +502,83 @@ new sql.ConnectionPool(config).connect().then(pool => {

All values are automatically sanitized against sql injection.

### Managing connection pools

Most applications will only need a single `ConnectionPool` that can be shared throughout the code. To aid the sharing
of a single pool this library exposes a set of functions to access a single global connection. eg:

```js
// as part of your application's boot process

const sql = require('mssql')
const poolPromise = sql.connect()

// during your applications runtime

poolPromise.then(() => {
return sql.query('SELECT 1')
}).then(result => {
console.dir(result)
})

// when your application exits
poolPromise.then(() => {
return sql.close()
})
```

If you require multiple pools per application (perhaps you have many DBs you need to connect to or you want a read-only
pool), then you will need to manage your pools yourself. The best way to do this is to create a shared library file that
can hold references to the pools for you. For example:

```js
const sql = require('mssql')

const pools = {}

// manage a set of pools by name (config will be required to create the pool)
// a pool will be removed when it is closed
async function getPool(name, config) {
if (!Object.prototype.hasOwnProperty.call(pools, name)) {
const pool = new sql.ConnectionPool(config)
const close = pool.close.bind(pool)
pool.close = (...args) => {
delete pools[name]
return close(...args)
}
await pool.connect()
pools[name] = pool
}
return pools[name]
}

// close all pools
function closeAll() {
return Promise.all(Object.values(pools).map((pool) => {
return pool.close()
}))
}

module.exports = {
closeAll,
getPool
}
```

You can then use this library file in your code to get a connected pool when you need it:

```js
const { getPool } = require('./path/to/file')

// run a query
async function runQuery(query, config) {
// pool will always be connected when the promise has resolved - may reject if the connection config is invalid
const pool = await getPool('default', config)
const result = await pool.request().query(query)
return result
}
```

## Configuration

```javascript
Expand Down
66 changes: 50 additions & 16 deletions lib/base/connection-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class ConnectionPool extends EventEmitter {
IDS.add(this, 'ConnectionPool')
debug('pool(%d): created', IDS.get(this))

this._connectStack = []
this._closeStack = []

this._connected = false
this._connecting = false
this._healthy = false
Expand Down Expand Up @@ -103,6 +106,8 @@ class ConnectionPool extends EventEmitter {
_acquire () {
if (!this.pool) {
return shared.Promise.reject(new ConnectionError('Connection not yet open.', 'ENOTOPEN'))
} else if (this.pool.destroyed) {
return shared.Promise.reject(new ConnectionError('Connection is closing', 'ENOTOPEN'))
}

return this.pool.acquire()
Expand Down Expand Up @@ -152,11 +157,14 @@ class ConnectionPool extends EventEmitter {

_connect (callback) {
if (this._connected) {
return setImmediate(callback, new ConnectionError('Database is already connected! Call close before connecting to different database.', 'EALREADYCONNECTED'))
debug('pool(%d): already connected, executing connect callback immediately', IDS.get(this))
return setImmediate(callback, null, this)
}

this._connectStack.push(callback)

if (this._connecting) {
return setImmediate(callback, new ConnectionError('Already connecting to database! Call close before connecting to different database.', 'EALREADYCONNECTING'))
return
}

this._connecting = true
Expand All @@ -167,12 +175,7 @@ class ConnectionPool extends EventEmitter {
debug('pool(%d): connected', IDS.get(this))
this._healthy = true

this._poolDestroy(connection).then(() => {
if (!this._connecting) {
debug('pool(%d): not connecting, exiting silently (was close called before connection established?)', IDS.get(this))
return
}

return this._poolDestroy(connection).then(() => {
// prepare pool
this.pool = new tarn.Pool(
Object.assign({
Expand Down Expand Up @@ -233,12 +236,18 @@ class ConnectionPool extends EventEmitter {

this._connecting = false
this._connected = true

callback(null, this)
})
}).then(() => {
this._connectStack.forEach((cb) => {
setImmediate(cb, null, this)
})
}).catch(err => {
this._connecting = false
callback(err)
this._connectStack.forEach((cb) => {
setImmediate(cb, err)
})
}).then(() => {
this._connectStack = []
})
}

Expand Down Expand Up @@ -285,13 +294,38 @@ class ConnectionPool extends EventEmitter {
*/

_close (callback) {
this._connecting = this._connected = this._healthy = false
// we don't allow pools in a connecting state to be closed because it means there are far too many
// edge cases to deal with
if (this._connecting) {
debug('pool(%d): close called while connecting', IDS.get(this))
setImmediate(callback, new ConnectionError('Cannot close a pool while it is connecting'))
}

if (!this.pool) return setImmediate(callback, null)
if (!this.pool) {
debug('pool(%d): already closed, executing close callback immediately', IDS.get(this))
return setImmediate(callback, null)
}

this._closeStack.push(callback)

if (this.pool.destroyed) return

this._connecting = this._connected = this._healthy = false

this.pool.destroy()
this.pool = null
callback(null)
this.pool.destroy().then(() => {
debug('pool(%d): pool closed, removing pool reference and executing close callbacks', IDS.get(this))
this.pool = null
this._closeStack.forEach(cb => {
setImmediate(cb, null)
})
}).catch(err => {
this.pool = null
this._closeStack.forEach(cb => {
setImmediate(cb, err)
})
}).then(() => {
this._closeStack = []
})
}

/**
Expand Down
4 changes: 4 additions & 0 deletions lib/base/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ class Request extends EventEmitter {
this.parameters = {}
}

get paused () {
return this._paused
}

/**
* Generate sql string and set imput parameters from tagged template string.
*
Expand Down
31 changes: 1 addition & 30 deletions lib/global-connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ const shared = require('./shared')

let globalConnection = null
const globalConnectionHandlers = {}
let onGlobalConnect = []

/**
* Open global connection pool.
Expand Down Expand Up @@ -49,35 +48,7 @@ function connect (config, callback) {

globalConnection.close = globalClose.bind(globalConnection)
}
if (!globalConnection.connected && !globalConnection.connecting) {
globalConnection.connect((err, pool) => {
onGlobalConnect.forEach(cb => {
setImmediate(cb, err, pool)
})
onGlobalConnect = []
})
}
if (globalConnection.connected) {
if (typeof callback === 'function') {
setImmediate(callback, null, globalConnection)
return globalConnection
} else {
return shared.Promise.resolve(globalConnection)
}
} else if (typeof callback === 'function') {
onGlobalConnect.push(callback)
return globalConnection
} else {
return new shared.Promise((resolve, reject) => {
onGlobalConnect.push((err, pool) => {
if (err) {
reject(err)
} else {
resolve(pool)
}
})
})
}
return globalConnection.connect(callback)
}

/**
Expand Down
2 changes: 1 addition & 1 deletion lib/msnodesqlv8/connection-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ class ConnectionPool extends BaseConnectionPool {
}
debug('connection(%d): destroying', IDS.get(tds))
tds.close(() => {
debug('connection(%d): destroyed', IDS.get(tds))
resolve()
})
debug('connection(%d): destroyed', IDS.get(tds))
})
}
}
Expand Down
12 changes: 9 additions & 3 deletions lib/msnodesqlv8/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,12 @@ class Request extends BaseRequest {
debug('request(%d): cancel', IDS.get(this))
req.cancelQuery(err => {
if (err) debug('request(%d): failed to cancel', IDS.get(this), err)
// this fixes an issue where paused connections don't emit a done event
try {
if (req.isPaused()) req.emit('done')
} catch (err) {
// do nothing
}
})
}

Expand Down Expand Up @@ -355,7 +361,7 @@ class Request extends BaseRequest {

if (row && row.___return___ == null) {
// row with ___return___ col is the last row
if (this.stream) this.emit('row', row)
if (this.stream && !this.paused) this.emit('row', row)
}
}

Expand Down Expand Up @@ -396,7 +402,7 @@ class Request extends BaseRequest {

if (row.___return___ == null) {
// row with ___return___ col is the last row
if (this.stream) this.emit('row', row)
if (this.stream && !this.paused) this.emit('row', row)
}
}

Expand Down Expand Up @@ -503,7 +509,7 @@ class Request extends BaseRequest {

if (row && row.___return___ == null) {
// row with ___return___ col is the last row
if (this.stream) { this.emit('row', row) }
if (this.stream && !this.paused) { this.emit('row', row) }
}
}

Expand Down
Loading

0 comments on commit e7560dc

Please sign in to comment.