Skip to content

Commit

Permalink
fix: only unassign currentMessage after fully processing it
Browse files Browse the repository at this point in the history
  • Loading branch information
arthurschreiber committed May 25, 2019
1 parent 826f6a0 commit e7b6164
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/incoming-message-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ class IncomingMessageStream extends Transform {
}

if (packet.isLast()) {
this.currentMessage = undefined;
// Wait until the current message was fully processed before we
// continue processing any remaining messages.
message.once('end', () => {
this.currentMessage = undefined;
this.processBufferedData(callback);
});
message.end(packet.data());
Expand Down
69 changes: 69 additions & 0 deletions test/unit/incoming-message-stream-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,73 @@ describe('IncomingMessageStream', function() {
});
});
});

it("correctly handles the last package coming in after the stream was paused", function(done) {
const packetData = Buffer.from('test1234');
const packetHeader = Buffer.alloc(8);

let offset = 0;
offset = packetHeader.writeUInt8(0x11, offset);
offset = packetHeader.writeUInt8(0x00, offset);
offset = packetHeader.writeUInt16BE(8 + packetData.length, offset);
offset = packetHeader.writeUInt16BE(0x0000, offset);
offset = packetHeader.writeUInt8(1, offset);
packetHeader.writeUInt8(0x00, offset);

const firstPacket = Buffer.concat([packetHeader, packetData]);

offset = 0;
offset = packetHeader.writeUInt8(0x11, offset);
offset = packetHeader.writeUInt8(0x01, offset);
offset = packetHeader.writeUInt16BE(8 + packetData.length, offset);
offset = packetHeader.writeUInt16BE(0x0000, offset);
offset = packetHeader.writeUInt8(1, offset);
packetHeader.writeUInt8(0x00, offset);

const secondPacket = Buffer.concat([packetHeader, packetData]);

const incoming = new IncomingMessageStream(new Debug());

const result = new BufferList(function(err, res) {
if (err) {
return done(err);
}

assert.deepEqual(res, Buffer.concat([ packetData, packetData ]));

done();
});

let messageEnded = false;
incoming.on('data', function(message) {
assert.instanceOf(message, Message);

message.on('end', function() {
messageEnded = true;
});

message.pipe(result);
});

incoming.write(firstPacket, function() {
const writtenData = result.slice();

assert.strictEqual(writtenData.length, 8);
assert.deepEqual(writtenData, packetData);

incoming.pause();

incoming.write(secondPacket, function() {
const writtenData = result.slice();

assert.strictEqual(writtenData.length, 16);
assert.deepEqual(writtenData, Buffer.concat([ packetData, packetData ]));

assert.strictEqual(messageEnded, true);
});

assert.isFalse(messageEnded);
incoming.resume();
});
});
});

0 comments on commit e7b6164

Please sign in to comment.