Skip to content

Commit

Permalink
Add JsonBuffer class
Browse files Browse the repository at this point in the history
- Moved Json parsing implementation from JsonSocket into JsonBuffer for reusability.
- Update tests
  • Loading branch information
JCThePants committed May 21, 2020
1 parent b2160b1 commit b191076
Show file tree
Hide file tree
Showing 5 changed files with 358 additions and 69 deletions.
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module.exports = {
JsonSocket: require('./libs/class.JsonSocket'),

/* Utils */
JsonBuffer: require('./libs/class.JsonBuffer'),
SocketLimitBuffer: require('./libs/class.SocketLimitBuffer'),
SocketWriter: require('./libs/class.SocketWriter')
};
145 changes: 145 additions & 0 deletions libs/class.JsonBuffer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
'use strict';

const
precon = require('@mintpond/mint-precon'),
mu = require('@mintpond/mint-utils');


class JsonBuffer {

/**
* Constructor.
*/
constructor(msgDelimiter) {
precon.opt_string(msgDelimiter, 'msgDelimiter');

const _ = this;
_._msgDelimiter = msgDelimiter;
_._buffer = '';
_._bufferLen = 0;
_._createTimeMs = Date.now();
_._lastAppendTimeMs = 0;
_._lastMessageTimeMs = 0;
_._timeout = 15;
}


/**
* The length in bytes of the data in the buffer.
* @returns {number}
*/
get length() { return this._bufferLen; }

/**
* Get the time in epoch milliseconds that the buffer was created.
* @returns {number}
*/
get createTimeMs() { return this._createTimeMs; }

/**
* Get the last time in epoch milliseconds that the buffer was appended to.
* @returns {number}
*/
get lastAppendTimeMs() { return this._lastAppendTimeMs; }

/**
* Get the last time a message was successfully parsed.
* @returns {number}
*/
get lastMessageTimeMs() { return this._lastMessageTimeMs; }


/**
* Append data to the buffer.
*
* @param dataBuf {string|Buffer}
* @param msgOutputArr {*[]} Output array to put parsed messages into.
* @param [errOutputArr] {*[]} Output array to put parsing errors into. This is only used if a delimiter is
* specified in the constructor.
* @returns {*[]} msgOutputArr
*/
append(dataBuf, msgOutputArr, errOutputArr) {
if (mu.isString(dataBuf)) {
precon.string(dataBuf, 'dataBuf');
}
else {
precon.buffer(dataBuf, 'dataBuf');
}
precon.array(msgOutputArr, 'msgOutputArr');
precon.opt_array(errOutputArr, 'errOutputArr');

const _ = this;
const addedStr = _._addBuffer(dataBuf);

_._lastAppendTimeMs = Date.now();

if (!_._msgDelimiter) {
const message = _._parseJson(_._buffer);
if (!mu.isUndefined(message)) {
msgOutputArr.push(message);
_.reset();
}
return;
}

if (addedStr.lastIndexOf(_._msgDelimiter) !== -1) {

const messagesArr = _._buffer.split(_._msgDelimiter);
const incomplete = _._buffer.slice(-1) === _._msgDelimiter
? ''
: messagesArr.pop();

if (messagesArr.length)
_._lastMessageTimeMs = Date.now();

messagesArr.forEach(strMessage => {

if (!strMessage)
return;

strMessage = strMessage.trim();

const message = _._parseJson(strMessage, errOutputArr);
if (mu.isUndefined(message))
return;

msgOutputArr.push(message);
});

_._buffer = incomplete;
_._bufferLen = Buffer.byteLength(_._buffer, 'utf8');
}
}


reset() {
const _ = this;
_._buffer = '';
_._bufferLen = 0;
}


_addBuffer(dataBuf) {
const _ = this;
const str = dataBuf.toString();
_._buffer += str;
_._bufferLen += dataBuf.length;
return str;
}


_parseJson(json, errOutArr) {
try {
return JSON.parse(json);
}
catch (err) {
errOutArr && errOutArr.push({
error: err,
json: json
});
return undefined;
}
}
}

module.exports = JsonBuffer;
89 changes: 21 additions & 68 deletions libs/class.JsonSocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ const
precon = require('@mintpond/mint-precon'),
mu = require('@mintpond/mint-utils'),
pu = require('@mintpond/mint-utils').prototypes,
TcpSocket = require('./abstract.TcpSocket');
TcpSocket = require('./abstract.TcpSocket'),
JsonBuffer = require('./class.JsonBuffer');


/**
Expand Down Expand Up @@ -42,9 +43,7 @@ class JsonSocket extends TcpSocket {
const _ = this;
_._shouldEnforceObj = !!args.shouldEnforceObj;

_._buffer = '';
_._bufferLen = 0;
_._errorArr = [];
_._buffer = new JsonBuffer('\n');
}


Expand Down Expand Up @@ -75,87 +74,41 @@ class JsonSocket extends TcpSocket {
const maxBytes = _.portConfig.maxBytes || 10240;

if (dataBuf.length > (_.portConfig.maxMessageBytes || maxBytes)) {
_._resetBuffer();
_._buffer.reset();
_.emit(TcpSocket.EVENT_FLOOD);
return;
}

const addedStr = _._addBuffer(dataBuf);
const errorsArr = [];
const messagesArr = [];
_._buffer.append(dataBuf, messagesArr, errorsArr);

if (_._bufferLen > maxBytes) {
_._resetBuffer();
if (_._buffer.length > maxBytes) {
_._buffer.reset();
_.emit(TcpSocket.EVENT_FLOOD);
return;
}

if (addedStr.lastIndexOf('\n') !== -1) {

const messagesArr = _._buffer.split('\n');
const incomplete = _._buffer.slice(-1) === '\n'
? ''
: messagesArr.pop();

if (_.$isRateLimitExceeded(messagesArr.length))
return;

_.emit(TcpSocket.EVENT_MESSAGES, {messagesArr: messagesArr});

messagesArr.forEach(strMessage => {
if (_.$isRateLimitExceeded(messagesArr.length))
return;

if (!strMessage)
return;
_.emit(TcpSocket.EVENT_MESSAGES, { messagesArr: messagesArr });

strMessage = strMessage.trim();
if (errorsArr.length) {
_.emit(TcpSocket.EVENT_MALFORMED_MESSAGE, errorsArr[0]);
}
else {
messagesArr.forEach(message => {

if (_.shouldEnforceObj && strMessage[0] !== '{') {
if (_.shouldEnforceObj && !mu.isObject(message)) {
_.emit(TcpSocket.EVENT_MALFORMED_MESSAGE, {
message: strMessage,
message: message,
error: new Error('Message is not an object')
});
return;
} else {
_.emit(TcpSocket.EVENT_MESSAGE_IN, {message: message});
}

const message = _._parseJson(strMessage, _._errorArr);
if (mu.isUndefined(message)) {
_.emit(TcpSocket.EVENT_MALFORMED_MESSAGE, {
message: strMessage,
error: _._errorArr.pop()
});
return;
}

_.emit(TcpSocket.EVENT_MESSAGE_IN, { message: message });
});

_._buffer = incomplete;
_._bufferLen = Buffer.byteLength(_._buffer, 'utf8');
}
}


_resetBuffer() {
const _ = this;
_._buffer = '';
_._bufferLen = 0;
}


_addBuffer(dataBuf) {
const _ = this;
const str = dataBuf.toString();
_._buffer += str;
_._bufferLen += dataBuf.length;
return str;
}


_parseJson(json, errOutArr) {
try {
return JSON.parse(json);
}
catch (err) {
errOutArr.push(err);
return undefined;
}
}

Expand Down
Loading

0 comments on commit b191076

Please sign in to comment.