Skip to content

Commit

Permalink
Add e2e test setup for working with winston 3.x support while keeping…
Browse files Browse the repository at this point in the history
… 2.x support (#60)

* add basic setup for testing with real logstash
* fix some issues noticed when setting up e2e environment
  • Loading branch information
jaakkos authored Nov 4, 2022
1 parent 520dee1 commit 8c133e6
Show file tree
Hide file tree
Showing 15 changed files with 2,096 additions and 280 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ results
npm-debug.log
test/runner.js

test/support/logstash/*
test/support/logstash/*

test-bench/logstash/logstash/output/*
265 changes: 135 additions & 130 deletions lib/connection.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable require-jsdoc */
const net = require('net');
const fs = require('fs');
const tls = require('tls');
Expand All @@ -10,153 +11,157 @@ const ECONNREFUSED_REGEXP = /ECONNREFUSED/;
* @param {object} options
* @param {function} onErrorHook
*/
function Connection(options, onErrorHook) {
this.onErrorHook = onErrorHook;
this.host = options.host || '127.0.0.1';
this.port = options.port || 28777;

// Connection state flags
this.connecting = false;
this.terminating = false;
this.connected = false;

// Connection retry attributes
this.tryReconnect = true;
this.retries = -1;
this.max_connect_retries =
('number' === typeof options.max_connect_retries) ?
options.max_connect_retries : 4;
this.timeout_connect_retries =
('number' === typeof options.timeout_connect_retries) ?
options.timeout_connect_retries : 100;

// SSL Settings
this.ssl_enable = options.ssl_enable || false;
this.secureContextOptions = this.ssl_enable ?
Connection.createSecureContextOptions(options) : null;
}
class Connection {
constructor(options, onErrorHook) {
this.onErrorHook = onErrorHook;
this.host = options.host || '127.0.0.1';
this.port = options.port || 28777;

// Connection state flags
this.connecting = false;
this.terminating = false;
this.connected = false;

Connection.createSecureContextOptions = function(options) {
const sslKey = options.ssl_key || '';
const sslCert = options.ssl_cert || '';
const ca = options.ca || '';
const sslPassphrase = options.ssl_passphrase || '';
const rejectUnauthorized = options.rejectUnauthorized === true;

const secureContextOptions = {
key: sslKey ? fs.readFileSync(sslKey) : null,
cert: sslCert ? fs.readFileSync(sslCert) : null,
passphrase: sslPassphrase ? sslPassphrase : null,
rejectUnauthorized: rejectUnauthorized === true,
ca: ca ? (function(caList) {
const caFilesList = [];

caList.forEach(function(filePath) {
caFilesList.push(fs.readFileSync(filePath));
});

return caFilesList;
}(ca)) : null,
};

return secureContextOptions;
};

Connection.prototype.socketOnError = function(error) {
this.connecting = false;
this.connected = false;

if (typeof (this.socket) !== 'undefined' && this.socket != null) {
this.socket.destroy();
this.socket = null;
// Connection retry attributes
this.tryReconnect = true;
this.retries = -1;
this.max_connect_retries =
('number' === typeof options.max_connect_retries) ?
options.max_connect_retries : 4;
this.timeout_connect_retries =
('number' === typeof options.timeout_connect_retries) ?
options.timeout_connect_retries : 100;
}

if (!ECONNREFUSED_REGEXP.test(error.message)) {
this.tryReconnect = false;
this.onErrorHook(error);
}
};
socketOnError(error) {
this.connecting = false;
this.connected = false;

Connection.prototype.socketOnTimeout = function() {
if (this.socket.readyState !== 'open') {
this.socket.destroy();
if (typeof (this.socket) !== 'undefined' && this.socket != null) {
this.socket.destroy();
}

if (!ECONNREFUSED_REGEXP.test(error.message)) {
this.tryReconnect = false;
this.onErrorHook(error);
}
}
socketOnTimeout() {
if (this.socket.readyState !== 'open') {
this.socket.destroy();
}
}
socketOnConnect() {
this.retries = 0;
this.connecting = false;
this.connected = true;
this.socket.setKeepAlive(true, 60 * 1000);
this.actionOnConnect();
}
};

Connection.prototype.socketOnSecureConnect = function() {
//
};
socketOnClose(error) {
this.connected = false;

Connection.prototype.socketOnConnect = function() {
this.retries = 0;
};
if (this.terminating) {
return;
}

Connection.prototype.socketOnClose = function(error) {
this.connected = false;
if (this.max_connect_retries < 0 ||
this.retries < this.max_connect_retries) {
if (!this.connecting) {
setTimeout(function() {
this.connect();
}.bind(this), this.timeout_connect_retries);
}
} else {
this.onErrorHook(
new Error('Max retries reached, transport in silent mode, OFFLINE'));
}
}
bindCommonEventListeners(socket) {
socket.on('error', this.socketOnError.bind(this));
socket.on('timeout', this.socketOnTimeout.bind(this));
socket.on('close', this.socketOnClose.bind(this));
}

if (this.terminating) {
return;
connect(onConnection) {
this.retries++;
this.connecting = true;
this.terminating = false;
this.actionOnConnect = onConnection || function() { };
}

if (this.max_connect_retries < 0 || this.retries < this.max_connect_retries) {
if (!this.connecting) {
setTimeout(function() {
this.connect();
}.bind(this), this.timeout_connect_retries);
close() {
this.terminating = true;
if (this.socket) {
this.socket.end();
this.socket.destroy();
this.connected = false;
}
} else {
this.silent = true;
this.onErrorHook(
new Error('Max retries reached, transport in silent mode, OFFLINE'));
}
};

Connection.prototype.bindEventListeners = function(socket) {
socket.on('error', this.socketOnError.bind(this));
socket.on('timeout', this.socketOnTimeout.bind(this));
socket.on('connect', this.socketOnConnect.bind(this));
socket.on('secureConnect', this.socketOnSecureConnect.bind(this));
socket.on('close', this.socketOnClose.bind(this));
};

Connection.prototype.onConnected = function() {
this.connecting = false;
this.connected = true;
this.socket.setKeepAlive(true, 60 * 1000);
this.actionOnConnect();
};

Connection.prototype.connect = function(onConnection) {
this.retries++;
this.connecting = true;
this.terminating = false;
this.actionOnConnect = onConnection || function() { };

if (this.ssl_enable) {
this.socket = tls.connect(this.port,
this.host,
this.secureContextOptions,
this.onConnected.bind(this));
} else {

send(message) {
this.socket.write(message);
}

readyToSend() {
return this.socket && this.socket.readyState === 'open';
}
}

class PlainConnection extends Connection {
connect(onConnection) {
super.connect(onConnection);
this.socket = new net.Socket();
this.socket.connect(this.port, this.host, this.onConnected.bind(this));
this.socket.connect(this.port, this.host);
super.bindCommonEventListeners(this.socket);
this.socket.on('connect', super.socketOnConnect.bind(this));
}
}

this.bindEventListeners(this.socket);
};
class SecureConnection extends Connection {
constructor(options, onErrorHook) {
super(options, onErrorHook);
// SSL Settings
this.ssl_enable = options.ssl_enable || false;
this.secureContextOptions = this.ssl_enable ?
SecureConnection.createSecureContextOptions(options) : null;
}

Connection.prototype.close = function() {
this.terminating = true;
if (this.connected && this.socket) {
this.socket.end();
this.socket.destroy();
this.socket = null;
this.connected = false;
static createSecureContextOptions(options) {
const sslKey = options.ssl_key || '';
const sslCert = options.ssl_cert || '';
const ca = options.ca || '';
const sslPassphrase = options.ssl_passphrase || '';
const rejectUnauthorized = options.rejectUnauthorized === true;

const secureContextOptions = {
key: sslKey ? fs.readFileSync(sslKey) : null,
cert: sslCert ? fs.readFileSync(sslCert) : null,
passphrase: sslPassphrase ? sslPassphrase : null,
rejectUnauthorized: rejectUnauthorized === true,
ca: ca ? (function(caList) {
const caFilesList = [];

caList.forEach(function(filePath) {
caFilesList.push(fs.readFileSync(filePath));
});

return caFilesList;
}(ca)) : null,
};

return secureContextOptions;
}
};

Connection.prototype.send = function(message) {
this.socket.write(message + '\n');
};
connect(onConnection) {
super.connect(onConnection);
this.socket = tls.connect(this.port,
this.host,
this.secureContextOptions);
super.bindCommonEventListeners(this.socket);
this.socket.on('secureConnect', super.socketOnConnect.bind(this));
}
}

module.exports = Connection;
module.exports = {SecureConnection, PlainConnection};
11 changes: 9 additions & 2 deletions lib/default-transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@ module.exports = function(level, msg, meta, self) {
return common.log({
level: level,
message: msg,
node_name: self.node_name,
meta: meta,
json: self.json,
logstash: self.logstash,
colorize: self.colorize,
prettyPrint: self.prettyPrint,
timestamp: self.timestamp,
json: true,
showLevel: self.showLevel,
stringify: self.stringify,
label: self.label,
depth: self.depth,
formatter: self.formatter,
humanReadableUnhandledException: self.humanReadableUnhandledException,
});
};
42 changes: 42 additions & 0 deletions lib/manager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/* eslint-disable require-jsdoc */
const Connection = require('./connection');

module.exports = class Manager {
constructor(options, onError) {
this.host = options.host || '127.0.0.1';
this.port = options.port || 28777;
this.connectionStarted = false;
this.logQueue = [];

if (options.ssl_enable) {
this.connection = new Connection.SecureConnection(options, onError);
} else {
this.connection = new Connection.PlainConnection(options, onError);
}
}

start() {
if (!this.connectionStarted) {
this.connection.connect(this.flush.bind(this));
this.connectionStarted = true;
}
}

log(entry, callback) {
this.logQueue.push([entry, callback]);
process.nextTick(this.flush.bind(this));
}

close() {
this.flush();
this.connection.close();
}

flush() {
while (this.connection.readyToSend() && this.logQueue.length) {
const [entry, callback] = this.logQueue.shift();
this.connection.send(entry + '\n');
callback();
}
}
};
Loading

0 comments on commit 8c133e6

Please sign in to comment.