Skip to content
This repository has been archived by the owner on Apr 24, 2020. It is now read-only.

Fix exception handling in read message path #567

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open

Fix exception handling in read message path #567

wants to merge 1 commit into from

Conversation

jkryl
Copy link

@jkryl jkryl commented Sep 28, 2016

The problem is if exception is thrown from handler of message event on zmq socket:

  • error event on zmq socket is emitted even though it's not a socket error but just an exception in event's handler (in user's code) and propagation of exception from user's code is stopped
  • the socket stops emitting message events after it, because reading of messages did not finish and if new messages arrive the state does not change

I have put together two simple test programs to demonstrate the problem. Pubber emits event every second and subber receives it and throws exception if event number is 10.

// pubber.js
const zmq = require('zmq');
const sock = zmq.socket('pub');

sock.bindSync('tcp://127.0.0.1:3000');
console.log('Publisher bound to port 3000');

var i = 1;

setInterval(function() {
    console.log('sending message ' + i);
    sock.send(['event', (i++).toString()]);
}, 1000);
// subber.js
const zmq = require('zmq');
const sock = zmq.socket('sub');

sock.connect('tcp://127.0.0.1:3000');
sock.subscribe('event');
console.log('Subscriber connected to port 3000');

sock.on('message', function(name, data) {
    data = parseInt(data);
    console.log('received', name.toString(), data);

    if (data == 10) {
        throw Error('Exception from message handler');
    }
});

sock.on('error', function(err) {
    console.log('In socket error handler:', err.toString());
});

process.on('uncaughtException', function(err) {
    console.log('Uncaught exception:', err.toString());
});

The fix is to distinguish between exceptions thrown from _zmq.readv() (from zmq bindings) and exceptions thrown from user provided callbacks. For the former case we behave like before. For the later case we reschedule reading from socket and re-throw exception so that it can get handled as normally.

} catch (error) {
// There might be additional unprocessed messages so continue reading
// after we throw the exception.
process.nextTick(this._flushReads.bind(this));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've been here in the past, and it was the source of all kinds of trouble iirc. Allow me some time to review this in more depth.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants