diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ba8f27..cc68f8e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Changelog +## 1.0.2 (2022-11-09) + +Features: + +- Update README.md + +Bugfixes: + +- Fix retry timeout which was removed on 1.x release. + ## 1.0.0 (2022-11-02) Features: diff --git a/LICENSE b/LICENSE index f414eb0..2f1e970 100644 --- a/LICENSE +++ b/LICENSE @@ -16,4 +16,4 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. \ No newline at end of file +SOFTWARE. diff --git a/README.md b/README.md index 3fa88c5..3076efb 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,10 @@ A [Logstash TCP][0] transport for [winston][1]. ## FAQ +### What configuration options are available? + +See documentation from [docs/configuration](docs/configuration.md) + ### How to keep the connection open while Logstash is restarting? It's possible to set max_connect_retries to -1 (infinite) so the client keeps trying to connect to the Logstash. So when Logstash is restarted the retry logic will reconnect when it comes back online. @@ -79,15 +83,15 @@ It's possible to set max_connect_retries to -1 (infinite) so the client keeps tr ## Run Tests ```shell - $ npm test + npm test ``` ## Run integration tests with Logstash ```shell - $ cd test-bench/winston-3x - $ docker-compose up -d - $ npm test + cd test-bench/winston-3x + docker-compose up -d + npm test ``` ## Inspiration diff --git a/docs/configuration.md b/docs/configuration.md index 57baf18..935387a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1,36 +1,36 @@ ## Configuration * `host` - * The host location of the logstash server. - * Default: `127.0.0.1` + * The host location of the logstash server. + * Default: `127.0.0.1` * `port` - * The host port to connect. - * Default: `28777` + * The host port to connect. + * Default: `28777` * `max_connect_retries` - * Max number of attempts to reconnect to logstash before going into silence. - * `-1` means retry forever. - * Default: `4` + * Max number of attempts to reconnect to logstash before going into silence. + * `-1` means retry forever. + * Default: `4` * `timeout_connect_retries` - * The number of ms between each retry for a reconnect to logstash . - * Default: `100` + * The number of ms between each retry for a reconnect to logstash . + * Default: `100` * `ssl_enable` - * Enable SSL transfer of logs to logstash. - * Default: `false` + * Enable SSL transfer of logs to logstash. + * Default: `false` * `ssl_key` - * Path location of client private key. - * Only needed if SSL verify is required on logstash. - * No default + * Path location of client private key. + * Only needed if SSL verify is required on logstash. + * No default * `ssl_cert` - * Path location of client public certificate. - * Only needed if SSL verify is required on logstash. - * No default + * Path location of client public certificate. + * Only needed if SSL verify is required on logstash. + * No default * `ssl_passphrase` - * Passphrase for the SSL key. - * Only needed if the certificate has a passphrase. - * No default + * Passphrase for the SSL key. + * Only needed if the certificate has a passphrase. + * No default * `rejectUnauthorized` - * If true the server will reject any connection which is not authorized with the list of supplied CAs. - * Default true + * If true the server will reject any connection which is not authorized with the list of supplied CAs. + * Default true * `strip_colors` - * Strip colors from messages and metadata - * Default: `false` + * Strip colors from messages and metadata + * Default: `false` diff --git a/lib/connection.js b/lib/connection.js index b4835e6..13d0295 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -19,31 +19,31 @@ class Connection { socketOnError(error) { this.manager.emit('connection:error', error); } + socketOnTimeout() { - if (this.socket.readyState !== 'open') { - this.socket.destroy(); - } + this.manager.emit('connection:timeout', this.socket.readyState); } + socketOnConnect() { this.socket.setKeepAlive(true, 60 * 1000); - this.manager.emit('connection:connect'); + this.manager.emit('connection:connected'); } socketOnClose(error) { - this.manager.emit('connection:close', error); + this.manager.emit('connection:closed', error); } addEventListeners(socket) { - socket.on('error', this.socketOnError.bind(this)); - socket.on('timeout', this.socketOnTimeout.bind(this)); - socket.on('close', this.socketOnClose.bind(this)); + socket.once('error', this.socketOnError.bind(this)); + socket.once('timeout', this.socketOnTimeout.bind(this)); + socket.once('close', this.socketOnClose.bind(this)); } close() { - if (this.socket) { - this.socket.end(); - this.socket.destroy(); - } + this.socket.removeAllListeners(); + this.socket.destroy(); + this.manager.emit('connection:closed'); + this.socket = null; } send(message) { @@ -67,10 +67,8 @@ class PlainConnection extends Connection { class SecureConnection extends Connection { constructor(options, manager) { super(options, manager); - // SSL Settings - this.ssl_enable = options.ssl_enable || false; - this.secureContextOptions = this.ssl_enable ? - SecureConnection.createSecureContextOptions(options) : null; + this.secureContextOptions = + SecureConnection.createSecureContextOptions(options); } static createSecureContextOptions(options) { diff --git a/lib/manager.js b/lib/manager.js index 4aee0cd..67b15c7 100644 --- a/lib/manager.js +++ b/lib/manager.js @@ -8,12 +8,13 @@ module.exports = class Manager extends EventEmitter { super(); this.host = options.host || '127.0.0.1'; this.port = options.port || 28777; - this.connectionStarted = false; + this.connection = null; this.closing = false; this.logQueue = []; + this.options = options; + this.ssl_enable = options.ssl_enable; // Connection retry attributes - this.tryReconnect = true; this.retries = -1; this.max_connect_retries = ('number' === typeof options.max_connect_retries) ? @@ -21,27 +22,40 @@ module.exports = class Manager extends EventEmitter { this.timeout_connect_retries = ('number' === typeof options.timeout_connect_retries) ? options.timeout_connect_retries : 100; + } - this.on('connection:connect', this.onConnection.bind(this)); - this.on('connection:close', this.onConnectionClose.bind(this)); + addEventListeners() { + this.on('connection:connected', this.onConnected.bind(this)); + this.on('connection:closed', this.onConnectionClosed.bind(this)); this.on('connection:error', this.onConnectionError.bind(this)); + this.on('connection:timeout', this.onConnectionError.bind(this)); + } + + removeEventListeners() { + this.off('connection:connected', this.onConnected.bind(this)); + this.off('connection:closed', this.onConnectionClosed.bind(this)); + this.off('connection:error', this.onConnectionError.bind(this)); + this.off('connection:timeout', this.onConnectionError.bind(this)); + } - if (options.ssl_enable) { - this.connection = new Connection.SecureConnection(options, this); + createConnection() { + if (this.ssl_enable) { + return new Connection.SecureConnection(this.options, this); } else { - this.connection = new Connection.PlainConnection(options, this); + return new Connection.PlainConnection(this.options, this); } } - onConnection() { + onConnected() { + this.emit('connected'); this.retries = 0; this.flush(); } - onConnectionClose() { - if (this.tryReconnect === true) { - this.connection.connect(this.flush.bind(this)); - } + onConnectionClosed() { + this.emit('closed'); + this.removeEventListeners(); + this.connection = null; } isRetryableError(error) { @@ -50,34 +64,51 @@ module.exports = class Manager extends EventEmitter { return true; // !ECONNREFUSED_REGEXP.test(error.code); } - tryToReconnect(error) { - if (this.isRetryableError(error) === true && - this.closing === false) { + shouldTryToReconnect(error) { + if (this.isRetryableError(error) === true) { if (this.max_connect_retries < 0 || this.retries < this.max_connect_retries) { return true; } else { return false; } + } else { + return false; } } onConnectionError(error) { - this.retries++; - this.tryReconnect = this.tryToReconnect(error); - - if (this.tryReconnect === false) { + if (this.shouldTryToReconnect(error)) { + this.removeEventListeners(); + this.connection.close(); this.emit('error', new Error('Max retries reached, transport in silent mode, OFFLINE')); - this.closing = true; - this.connection.close(); + } else { + this.retry(); } } + retry() { + this.emit('retrying'); + this.removeEventListeners(); + const self = this; + this.once('connection:closed', () => { + self.connection = null; + self.removeEventListeners(); + setInterval(() => { + self.start(); + }, + self.timeout_connect_retries); + }); + this.connection.close(); + } + start() { - if (!this.connectionStarted) { + if (!this.connection) { + this.retries++; + this.connection = this.createConnection(); + this.addEventListeners(); this.connection.connect(); - this.connectionStarted = true; } } @@ -87,13 +118,17 @@ module.exports = class Manager extends EventEmitter { } close() { + this.emit('closing'); this.flush(); - this.closing = true; + this.removeEventListeners(); this.connection.close(); } flush() { - while (this.connection.readyToSend() && this.logQueue.length) { + this.emit('flushing'); + while (this.connection && + this.connection.readyToSend() && + this.logQueue.length) { const [entry, callback] = this.logQueue.shift(); this.connection.send(entry + '\n'); callback(); diff --git a/package.json b/package.json index cd2d706..caf4c74 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "winston-logstash", - "version": "1.0.1", + "version": "1.0.2", "description": "A Logstash transport for winston", "main": "./lib/winston-logstash", "homepage": "https://github.com/jaakkos/winston-logstash", @@ -8,6 +8,9 @@ "test": "mocha", "lint": "eslint ." }, + "files": [ + "lib/" + ], "engines": { "node": ">=6" }, diff --git a/test/winston-logstash_test.js b/test/winston-logstash_test.js index 2e4c2fe..1368034 100644 --- a/test/winston-logstash_test.js +++ b/test/winston-logstash_test.js @@ -9,17 +9,33 @@ const fs = require('fs'); const winston = require('winston'); const timekeeper = require('timekeeper'); const freezedTime = new Date(1330688329321); -let port = 28777; +const port = 28777; chai.config.includeStack = true; require('../lib/winston-logstash'); describe('winston-logstash transport', function() { + const openSockets = new Set(); + + const forceCloseAllSocket = () => { + const openSocketsCount = openSockets.size; + for (const socket of openSockets.values()) { + socket.destroy(); + } + + return openSocketsCount; + }; + function createTestServer(port, onData) { const server = net.createServer(function(socket) { - socket.on('end', function() {}); - socket.on('data', onData); + openSockets.add(socket); + socket.on('close', () => { + openSockets.delete(socket); + }); + socket.on('data', (data) => { + onData(data); + }); }); server.listen(port); @@ -42,9 +58,14 @@ describe('winston-logstash transport', function() { ca: [fs.readFileSync(__dirname + '/support/ssl/ca.cert')], }; - const server = tls.createServer(serverOptions, function(socket) { - socket.on('end', function() { }); - socket.on('data', onData); + const server = tls.createServer(serverOptions, (socket) => { + openSockets.add(socket); + socket.on('end', () => { + openSockets.delete(socket); + }); + socket.on('data', (data) => { + onData(data); + }); }); server.listen(port, 'localhost'); @@ -75,15 +96,28 @@ describe('winston-logstash transport', function() { }); } + function setup(done, port, timekeeper) { + port++; + timekeeper.freeze(freezedTime); + done(); + } + + function tearDown(done, logger, timekeeper, testServer) { + logger.close(); + forceCloseAllSocket(); + timekeeper.reset(); + testServer.close(() => { + testServer = null; + logger = null; + done(); + }); + } + describe('with logstash server', function() { let testServer; let logger; - beforeEach(function(done) { - port++; - timekeeper.freeze(freezedTime); - done(); - }); + beforeEach((done) => setup(done, port, timekeeper)); it('send logs over TCP as valid json', function(done) { let response; @@ -148,29 +182,15 @@ describe('winston-logstash transport', function() { }); // Teardown - afterEach(function(done) { - if (logger) { - logger.close(); - } - timekeeper.reset(); - if (testServer) { - testServer.close(() => { - testServer = null; - logger = null; - done(); - }); - } + afterEach((done) => { + tearDown(done, logger, timekeeper, testServer); }); }); describe('with secured logstash server', function() { let testServer; let logger; - beforeEach(function(done) { - port++; - timekeeper.freeze(freezedTime); - done(); - }); + beforeEach((done) => setup(done, port, timekeeper)); it('send logs over SSL secured TCP as valid json', function(done) { let response; @@ -244,18 +264,8 @@ describe('winston-logstash transport', function() { }); // Teardown - afterEach(function(done) { - if (logger) { - logger.close(); - } - timekeeper.reset(); - if (testServer) { - testServer.close(function() { - testServer = null; - logger = null; - done(); - }); - } + afterEach((done) => { + tearDown(done, logger, timekeeper, testServer); }); });