Skip to content

Commit

Permalink
Issue/71 config reload automatic (#76)
Browse files Browse the repository at this point in the history
* Re-connect if server-side closes the connection
* Add sandbox for testing Logstash things
* Add test for Logstash closing the connection
  • Loading branch information
jaakkos authored Mar 27, 2023
1 parent 707f5d9 commit 310db72
Show file tree
Hide file tree
Showing 11 changed files with 1,062 additions and 59 deletions.
35 changes: 30 additions & 5 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,57 +9,81 @@ var _fs = require("fs");
var _tls = _interopRequireDefault(require("tls"));
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
var ConnectionActions;
(function (ConnectionActions) {
ConnectionActions["Initializing"] = "Initializing";
ConnectionActions["Connecting"] = "Connecting";
ConnectionActions["Closing"] = "Closing";
ConnectionActions["Tranferring"] = "Transferring";
ConnectionActions["HandlingError"] = "HandlingError";
})(ConnectionActions || (ConnectionActions = {}));
class Connection {
constructor(options, manager) {
var _options$host, _options$port;
_defineProperty(this, "socket", void 0);
_defineProperty(this, "host", void 0);
_defineProperty(this, "port", void 0);
_defineProperty(this, "manager", void 0);
_defineProperty(this, "action", void 0);
this.action = ConnectionActions.Initializing;
this.manager = manager;
this.host = (_options$host = options === null || options === void 0 ? void 0 : options.host) !== null && _options$host !== void 0 ? _options$host : '127.0.0.1';
this.port = (_options$port = options === null || options === void 0 ? void 0 : options.port) !== null && _options$port !== void 0 ? _options$port : 28777;
}
socketOnError(error) {
this.action = ConnectionActions.HandlingError;
this.manager.emit('connection:error', error);
}
socketOnTimeout() {
var _this$socket;
this.action = ConnectionActions.HandlingError;
this.manager.emit('connection:timeout', (_this$socket = this.socket) === null || _this$socket === void 0 ? void 0 : _this$socket.readyState);
}
socketOnConnect() {
var _this$socket2;
(_this$socket2 = this.socket) === null || _this$socket2 === void 0 ? void 0 : _this$socket2.setKeepAlive(true, 60 * 1000);
this.action = ConnectionActions.Tranferring;
this.manager.emit('connection:connected');
}
socketOnDrain() {
this.manager.emit('connection:drain');
}
socketOnClose(error) {
this.manager.emit('connection:closed', error);
if (this.action === ConnectionActions.Closing) {
this.manager.emit('connection:closed', error);
} else {
this.manager.emit('connection:closed:by-server', error);
}
}
addEventListeners(socket) {
socket.on('drain', this.socketOnDrain.bind(this));
socket.once('error', this.socketOnError.bind(this));
socket.once('timeout', this.socketOnTimeout.bind(this));
socket.once('close', this.socketOnClose.bind(this));
}
close() {
var _this$socket3, _this$socket4;
this.action = ConnectionActions.Closing;
(_this$socket3 = this.socket) === null || _this$socket3 === void 0 ? void 0 : _this$socket3.removeAllListeners();
(_this$socket4 = this.socket) === null || _this$socket4 === void 0 ? void 0 : _this$socket4.destroy();
this.manager.emit('connection:closed');
}
send(message) {
send(message, writeCallback) {
var _this$socket5;
(_this$socket5 = this.socket) === null || _this$socket5 === void 0 ? void 0 : _this$socket5.write(message);
return ((_this$socket5 = this.socket) === null || _this$socket5 === void 0 ? void 0 : _this$socket5.write(Buffer.from(message), writeCallback)) === true;
}
readyToSend() {
return this.socket && this.socket.readyState === 'open';
var _this$socket6;
return ((_this$socket6 = this.socket) === null || _this$socket6 === void 0 ? void 0 : _this$socket6.readyState) === 'open';
}
connect() {
// Place Holder
this.action = ConnectionActions.Connecting;
}
}
exports.Connection = Connection;
class PlainConnection extends Connection {
connect() {
super.connect();
this.socket = new _net.Socket();
this.socket.connect(this.port, this.host);
super.addEventListeners(this.socket);
Expand Down Expand Up @@ -89,6 +113,7 @@ class SecureConnection extends Connection {
return secureContextOptions;
}
connect() {
super.connect();
this.socket = _tls.default.connect(this.port, this.host, this.secureContextOptions);
super.addEventListeners(this.socket);
this.socket.on('secureConnect', super.socketOnConnect.bind(this));
Expand Down
46 changes: 29 additions & 17 deletions lib/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,38 @@ class Manager extends _events.EventEmitter {
_defineProperty(this, "connection", void 0);
_defineProperty(this, "logQueue", void 0);
_defineProperty(this, "options", void 0);
_defineProperty(this, "ssl_enable", void 0);
_defineProperty(this, "useSecureConnection", void 0);
_defineProperty(this, "retries", -1);
_defineProperty(this, "max_connect_retries", void 0);
_defineProperty(this, "timeout_connect_retries", void 0);
_defineProperty(this, "retry_timeout", undefined);
_defineProperty(this, "maxConnectRetries", void 0);
_defineProperty(this, "timeoutConnectRetries", void 0);
_defineProperty(this, "retryTimeout", undefined);
this.options = options;
this.ssl_enable = (options === null || options === void 0 ? void 0 : options.ssl_enable) === true;
this.useSecureConnection = (options === null || options === void 0 ? void 0 : options.ssl_enable) === true;
this.logQueue = new Array();

// Connection retry attributes
this.retries = 0;
this.max_connect_retries = (_options$max_connect_ = options === null || options === void 0 ? void 0 : options.max_connect_retries) !== null && _options$max_connect_ !== void 0 ? _options$max_connect_ : 4;
this.timeout_connect_retries = (_options$timeout_conn = options === null || options === void 0 ? void 0 : options.timeout_connect_retries) !== null && _options$timeout_conn !== void 0 ? _options$timeout_conn : 100;
this.maxConnectRetries = (_options$max_connect_ = options === null || options === void 0 ? void 0 : options.max_connect_retries) !== null && _options$max_connect_ !== void 0 ? _options$max_connect_ : 4;
this.timeoutConnectRetries = (_options$timeout_conn = options === null || options === void 0 ? void 0 : options.timeout_connect_retries) !== null && _options$timeout_conn !== void 0 ? _options$timeout_conn : 100;
}
addEventListeners() {
this.once('connection:connected', this.onConnected.bind(this));
this.once('connection:closed', this.onConnectionClosed.bind(this));
this.once('connection:closed:by-server', this.onConnectionError.bind(this));
this.once('connection:error', this.onConnectionError.bind(this));
this.once('connection:timeout', this.onConnectionError.bind(this));
this.on('connection:drain', this.flush.bind(this));
}
removeEventListeners() {
this.off('connection:connected', this.onConnected.bind(this));
this.off('connection:closed', this.onConnectionClosed.bind(this));
this.off('connection:closed:by-server', this.onConnectionError.bind(this));
this.off('connection:error', this.onConnectionError.bind(this));
this.off('connection:timeout', this.onConnectionError.bind(this));
this.off('connection:drain', this.flush.bind(this));
}
createConnection() {
if (this.ssl_enable) {
if (this.useSecureConnection) {
return new _connection.SecureConnection(this.options, this);
} else {
return new _connection.PlainConnection(this.options, this);
Expand All @@ -53,7 +57,7 @@ class Manager extends _events.EventEmitter {
this.retries = 0;
this.flush();
}
onConnectionClosed() {
onConnectionClosed(error) {
this.emit('closed');
this.removeEventListeners();
}
Expand All @@ -65,7 +69,7 @@ class Manager extends _events.EventEmitter {

shouldTryToReconnect(error) {
if (this.isRetryableError(error) === true) {
if (this.max_connect_retries < 0 || this.retries < this.max_connect_retries) {
if (this.maxConnectRetries < 0 || this.retries < this.maxConnectRetries) {
return true;
} else {
return false;
Expand All @@ -86,18 +90,18 @@ class Manager extends _events.EventEmitter {
}
retry() {
var _this$connection2;
if (this.retry_timeout) {
clearTimeout(this.retry_timeout);
if (this.retryTimeout) {
clearTimeout(this.retryTimeout);
}
this.emit('retrying');
this.removeEventListeners();
const self = this;
this.once('connection:closed', () => {
self.removeEventListeners();
self.retry_timeout = setTimeout(() => {
self.retryTimeout = setTimeout(() => {
self.connection = undefined;
self.start();
}, self.timeout_connect_retries);
}, self.timeoutConnectRetries);
});
(_this$connection2 = this.connection) === null || _this$connection2 === void 0 ? void 0 : _this$connection2.close();
}
Expand All @@ -122,12 +126,20 @@ class Manager extends _events.EventEmitter {
}
flush() {
this.emit('flushing');
while (this.connection && this.connection.readyToSend() && this.logQueue.length) {
let connectionIsDrained = true;
while (this.logQueue.length && connectionIsDrained && (_this$connection4 = this.connection) !== null && _this$connection4 !== void 0 && _this$connection4.readyToSend()) {
var _this$connection4;
const logEntry = this.logQueue.shift();
if (logEntry) {
const [entry, callback] = logEntry;
this.connection.send(entry + '\n');
callback();
const self = this;
connectionIsDrained = this.connection.send(entry + '\n', error => {
if (error) {
self.logQueue.unshift(logEntry);
} else {
callback();
}
});
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"@babel/preset-typescript": "7.18.6",
"@jest/globals": "29.3.1",
"@types/jest": "29.2.2",
"auto-changelog": "*",
"chai": "4.3.6",
"eslint": "8.26.0",
"eslint-config-google": "0.14.0",
Expand All @@ -40,8 +41,7 @@
"ts-jest": "29.0.3",
"ts-loader": "9.4.1",
"typescript": "4.8.4",
"winston": "^0.7.3 || ^1.0.0 || ^2.0.0 ",
"auto-changelog": "*"
"winston": "^0.7.3 || ^1.0.0 || ^2.0.0 "
},
"keywords": [
"logging",
Expand Down
37 changes: 31 additions & 6 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,62 +12,86 @@ import { WinstonModuleTransportOptions } from 'winston';
import { Manager } from './manager';
import { LogstashTransportSSLOptions } from './types';

enum ConnectionActions {
Initializing = "Initializing",
Connecting = "Connecting",
Closing = "Closing",
Tranferring = "Transferring",
HandlingError = "HandlingError"
}

export class Connection {
protected socket: Socket | undefined;
protected host: string;
protected port: number;
protected manager: any;
protected action: ConnectionActions;

constructor(options: WinstonModuleTransportOptions, manager: Manager) {
this.action = ConnectionActions.Initializing;
this.manager = manager;
this.host = options?.host ?? '127.0.0.1';
this.port = options?.port ?? 28777;
}

private socketOnError(error: Error) {
this.action = ConnectionActions.HandlingError;
this.manager.emit('connection:error', error);
}

private socketOnTimeout() {
this.action = ConnectionActions.HandlingError;
this.manager.emit('connection:timeout', this.socket?.readyState);
}

protected socketOnConnect() {
this.socket?.setKeepAlive(true, 60 * 1000);
this.action = ConnectionActions.Tranferring;
this.manager.emit('connection:connected');
}

protected socketOnDrain() {
this.manager.emit('connection:drain');
}

private socketOnClose(error: Error) {
this.manager.emit('connection:closed', error);
if (this.action === ConnectionActions.Closing) {
this.manager.emit('connection:closed', error);
} else {
this.manager.emit('connection:closed:by-server', error);
}
}

protected addEventListeners(socket: Socket) {
socket.on('drain', this.socketOnDrain.bind(this));
socket.once('error', this.socketOnError.bind(this));
socket.once('timeout', this.socketOnTimeout.bind(this));
socket.once('close', this.socketOnClose.bind(this));
}

close() {
this.action = ConnectionActions.Closing;
this.socket?.removeAllListeners();
this.socket?.destroy();
this.manager.emit('connection:closed');
}

send(message: string) {
this.socket?.write(message);
send(message: string, writeCallback: (error?: Error) => void): boolean {
return this.socket?.write(Buffer.from(message), writeCallback) === true;
}

readyToSend() {
return this.socket && this.socket.readyState === 'open';
readyToSend(): boolean {
return this.socket?.readyState === 'open';
}

connect() {
// Place Holder
this.action = ConnectionActions.Connecting;
}
}

export class PlainConnection extends Connection {
connect() {
super.connect();
this.socket = new Socket();
this.socket.connect(this.port, this.host);
super.addEventListeners(this.socket);
Expand Down Expand Up @@ -102,6 +126,7 @@ export class SecureConnection extends Connection {
}

connect() {
super.connect();
this.socket = tls.connect(this.port,
this.host,
this.secureContextOptions);
Expand Down
Loading

0 comments on commit 310db72

Please sign in to comment.