Skip to content
This repository has been archived by the owner on Apr 24, 2020. It is now read-only.

Fix exception handling in read message path #567

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 29 additions & 24 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -623,29 +623,6 @@ Socket.prototype.send = function(msg, flags, cb) {
return this;
};

Socket.prototype._emitMessage = function (message) {
if (message.length === 1) {
// hot path
this.emit('message', message[0]);
} else {
this.emit.apply(this, ['message'].concat(message));
}
}

Socket.prototype._flushRead = function () {
try {
var message = this._zmq.readv(); // can throw
if (!message) {
return false;
}
// Handle received message immediately to prevent memory leak in driver
this._emitMessage(message)
} catch (error) {
this.emit('error', error); // can throw
}
return true;
};

Socket.prototype._flushWrite = function () {
var batch = this._outgoing.fetch();
if (!batch) {
Expand Down Expand Up @@ -675,7 +652,35 @@ Socket.prototype._flushReads = function() {

this._isFlushingReads = true;

while (this._flushRead());
var message;

do {
try {
message = this._zmq.readv(); // can throw
} catch (error) {
this._isFlushingReads = false;
this.emit('error', error); // can throw
return;
}

if (message) {
try {
// Handle received message immediately to prevent memory leak in driver
if (message.length === 1) {
// hot path
this.emit('message', message[0]);
} else {
this.emit.apply(this, ['message'].concat(message));
}
} catch (error) {
// There might be additional unprocessed messages so continue reading
// after we throw the exception.
process.nextTick(this._flushReads.bind(this));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've been here in the past, and it was the source of all kinds of trouble iirc. Allow me some time to review this in more depth.

this._isFlushingReads = false;
throw error;
}
}
} while (message);

this._isFlushingReads = false;

Expand Down