Skip to content

Commit

Permalink
Merge pull request #350 from furstenheim/master
Browse files Browse the repository at this point in the history
Adapt to new Buffer interface
  • Loading branch information
squaremo authored Jun 15, 2017
2 parents 64139fe + cac3f4e commit 31bb2a9
Show file tree
Hide file tree
Showing 33 changed files with 110 additions and 100 deletions.
11 changes: 6 additions & 5 deletions bin/generate-defs.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,15 @@ println(
'*/');

println("'use strict';"); nl();

println('var Buffer = require("safe-buffer").Buffer;');
nl()
println('var codec = require("./codec");');
println('var ints = require("buffer-more-ints");');
println('var encodeTable = codec.encodeTable;');
println('var decodeFields = codec.decodeFields;');
nl();

println('var SCRATCH = new Buffer(4096);');
println('var SCRATCH = Buffer.alloc(4096);');
println('var EMPTY_OBJECT = Object.freeze({});');

println('module.exports.constants = %s',
Expand Down Expand Up @@ -238,7 +239,7 @@ function typeDesc(t) {
function defaultValueRepr(arg) {
switch (arg.type) {
case 'longstr':
return format("new Buffer(%s)", JSON.stringify(arg.default));
return format("Buffer.from(%s)", JSON.stringify(arg.default));
default:
// assumes no tables as defaults
return JSON.stringify(arg.default);
Expand Down Expand Up @@ -364,7 +365,7 @@ function encoderFn(method) {
}
}

println('var buffer = new Buffer(%d + varyingSize);', fixedSize);
println('var buffer = Buffer.alloc(%d + varyingSize);', fixedSize);

println('buffer[0] = %d;', constants.FRAME_METHOD);
println('buffer.writeUInt16BE(channel, 1);');
Expand Down Expand Up @@ -587,7 +588,7 @@ function encodePropsFn(props) {
println('}');
}

println('var buffer = new Buffer(%d + varyingSize);', fixedSize);
println('var buffer = Buffer.alloc(%d + varyingSize);', fixedSize);

println('buffer[0] = %d', constants.FRAME_HEADER);
println('buffer.writeUInt16BE(channel, 1);');
Expand Down
4 changes: 2 additions & 2 deletions examples/headers.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ function bindAndConsume(ch, ex, q) {

function send(ch, ex) {
// The headers for a message are given as an option to `publish`:
ch.publish(ex.exchange, '', new Buffer('hello'), {headers: {baz: 'boo'}});
ch.publish(ex.exchange, '', new Buffer('world'), {headers: {foo: 'bar'}});
ch.publish(ex.exchange, '', Buffer.from('hello'), {headers: {baz: 'boo'}});
ch.publish(ex.exchange, '', Buffer.from('world'), {headers: {foo: 'bar'}});
}
2 changes: 1 addition & 1 deletion examples/receive_generator.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ co(function* () {
const msg = 'Hello World!';
const channel = yield conn.createChannel();
yield channel.assertQueue(q);
channel.sendToQueue(q, new Buffer(msg));
channel.sendToQueue(q, Buffer.from(msg));
console.log(" [x] Sent '%s'", msg);
// consume the message
yield channel.consume(q, myConsumer, { noAck: true });
Expand Down
2 changes: 1 addition & 1 deletion examples/send_generators.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ co(function* () {

yield channel.assertQueue(q);

channel.sendToQueue(q, new Buffer(msg));
channel.sendToQueue(q, Buffer.from(msg));

// if message has been nacked, this will result in an error (rejected promise);
yield channel.waitForConfirms();
Expand Down
2 changes: 1 addition & 1 deletion examples/ssl.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ var open = amqp.connect('amqps://localhost', opts);
open.then(function(conn) {
process.on('SIGINT', conn.close.bind(conn));
return conn.createChannel().then(function(ch) {
ch.sendToQueue('foo', new Buffer('Hello World!'));
ch.sendToQueue('foo', Buffer.from('Hello World!'));
});
}).then(null, console.warn);
2 changes: 1 addition & 1 deletion examples/tutorials/callback_api/emit_log.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ function on_connect(err, conn) {
ch.assertExchange(ex, 'fanout', {durable: false});
var msg = process.argv.slice(2).join(' ') ||
'info: Hello World!';
ch.publish(ex, '', new Buffer(msg));
ch.publish(ex, '', Buffer.from(msg));
console.log(" [x] Sent '%s'", msg);
ch.close(function() { conn.close(); });
}
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorials/callback_api/emit_log_direct.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ function on_connect(err, conn) {
function on_channel_open(err, ch) {
if (err !== null) return bail(err, conn);
ch.assertExchange(ex, 'direct', exopts, function(err, ok) {
ch.publish(ex, severity, new Buffer(message));
ch.publish(ex, severity, Buffer.from(message));
ch.close(function() { conn.close(); });
});
}
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorials/callback_api/emit_log_topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ function on_connect(err, conn) {
conn.createChannel(function(err, ch) {
ch.assertExchange(ex, 'topic', exopts, function(err, ok) {
if (err !== null) return bail(err, conn);
ch.publish(ex, key, new Buffer(message));
ch.publish(ex, key, Buffer.from(message));
console.log(" [x] Sent %s:'%s'", key, message);
ch.close(function() { conn.close(); });
});
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorials/callback_api/new_task.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ function on_connect(err, conn) {
ch.assertQueue(q, {durable: true}, function(err, _ok) {
if (err !== null) return bail(err, conn);
var msg = process.argv.slice(2).join(' ') || "Hello World!";
ch.sendToQueue(q, new Buffer(msg), {persistent: true});
ch.sendToQueue(q, Buffer.from(msg), {persistent: true});
console.log(" [x] Sent '%s'", msg);
ch.close(function() { conn.close(); });
});
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorials/callback_api/rpc_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ function on_connect(err, conn) {
var queue = ok.queue;
ch.consume(queue, maybeAnswer, {noAck:true});
console.log(' [x] Requesting fib(%d)', n);
ch.sendToQueue('rpc_queue', new Buffer(n.toString()), {
ch.sendToQueue('rpc_queue', Buffer.from(n.toString()), {
replyTo: queue, correlationId: correlationId
});
});
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorials/callback_api/rpc_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ function on_connect(err, conn) {
var n = parseInt(msg.content.toString());
console.log(' [.] fib(%d)', n);
ch.sendToQueue(msg.properties.replyTo,
new Buffer(fib(n).toString()),
Buffer.from(fib(n).toString()),
{correlationId: msg.properties.correlationId});
ch.ack(msg);
}
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorials/callback_api/send.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ function on_connect(err, conn) {
if (err !== null) return bail(err, conn);
ch.assertQueue(q, {durable: false}, function(err, ok) {
if (err !== null) return bail(err, conn);
ch.sendToQueue(q, new Buffer(msg));
ch.sendToQueue(q, Buffer.from(msg));
console.log(" [x] Sent '%s'", msg);
ch.close(function() { conn.close(); });
});
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorials/emit_log.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ amqp.connect('amqp://localhost').then(function(conn) {
'info: Hello World!';

return ok.then(function() {
ch.publish(ex, '', new Buffer(message));
ch.publish(ex, '', Buffer.from(message));
console.log(" [x] Sent '%s'", message);
return ch.close();
});
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorials/emit_log_direct.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ amqp.connect('amqp://localhost').then(function(conn) {
var ok = ch.assertExchange(ex, 'direct', {durable: false});

return ok.then(function() {
ch.publish(ex, severity, new Buffer(message));
ch.publish(ex, severity, Buffer.from(message));
console.log(" [x] Sent %s:'%s'", severity, message);
return ch.close();
});
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorials/emit_log_topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ amqp.connect('amqp://localhost').then(function(conn) {
var ex = 'topic_logs';
var ok = ch.assertExchange(ex, 'topic', {durable: false});
return ok.then(function() {
ch.publish(ex, key, new Buffer(message));
ch.publish(ex, key, Buffer.from(message));
console.log(" [x] Sent %s:'%s'", key, message);
return ch.close();
});
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorials/new_task.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ amqp.connect('amqp://localhost').then(function(conn) {

return ok.then(function() {
var msg = process.argv.slice(2).join(' ') || "Hello World!";
ch.sendToQueue(q, new Buffer(msg), {deliveryMode: true});
ch.sendToQueue(q, Buffer.from(msg), {deliveryMode: true});
console.log(" [x] Sent '%s'", msg);
return ch.close();
});
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorials/rpc_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ amqp.connect('amqp://localhost').then(function(conn) {

ok = ok.then(function(queue) {
console.log(' [x] Requesting fib(%d)', n);
ch.sendToQueue('rpc_queue', new Buffer(n.toString()), {
ch.sendToQueue('rpc_queue', Buffer.from(n.toString()), {
correlationId: corrId, replyTo: queue
});
});
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorials/rpc_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ amqp.connect('amqp://localhost').then(function(conn) {
console.log(' [.] fib(%d)', n);
var response = fib(n);
ch.sendToQueue(msg.properties.replyTo,
new Buffer(response.toString()),
Buffer.from(response.toString()),
{correlationId: msg.properties.correlationId});
ch.ack(msg);
}
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorials/send.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ amqp.connect('amqp://localhost').then(function(conn) {
// (when `false`) that you should wait for the event `'drain'`
// to fire before writing again. We're just doing the one write,
// so we'll ignore it.
ch.sendToQueue(q, new Buffer(msg));
ch.sendToQueue(q, Buffer.from(msg));
console.log(" [x] Sent '%s'", msg);
return ch.close();
});
Expand Down
2 changes: 1 addition & 1 deletion examples/waitForConfirms.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ function mkCallback(i) {
amqp.connect().then(function(c) {
c.createConfirmChannel().then(function(ch) {
for (var i=0; i < NUM_MSGS; i++) {
ch.publish('amq.topic', 'whatever', new Buffer('blah'), {}, mkCallback(i));
ch.publish('amq.topic', 'whatever', Buffer.from('blah'), {}, mkCallback(i));
}
ch.waitForConfirms().then(function() {
console.log('All messages done');
Expand Down
4 changes: 2 additions & 2 deletions lib/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var EventEmitter = require('events').EventEmitter;
var fmt = require('util').format;
var IllegalOperationError = require('./error').IllegalOperationError;
var stackCapture = require('./error').stackCapture;

var Buffer = require('safe-buffer').Buffer
function Channel(connection) {
EventEmitter.call( this );
this.connection = connection;
Expand Down Expand Up @@ -286,7 +286,7 @@ function acceptMessage(continuation) {

// for zero-length messages, content frames aren't required.
if (totalSize === 0) {
message.content = new Buffer(0);
message.content = Buffer.alloc(0);
continuation(message);
return acceptDeliveryOrReturn;
}
Expand Down
7 changes: 4 additions & 3 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ var constants = defs.constants;
var frame = require('./frame');
var HEARTBEAT = frame.HEARTBEAT;
var Mux = require('./mux').Mux;
var Buffer = require('safe-buffer').Buffer

var Duplex =
require('stream').Duplex ||
Expand Down Expand Up @@ -43,7 +44,7 @@ function Connection(underlying) {
this.muxer = new Mux(stream);

// frames
this.rest = new Buffer(0);
this.rest = Buffer.alloc(0);
this.frameMax = constants.FRAME_MIN_SIZE;
this.sentSinceLastCheck = false;
this.recvSinceLastCheck = false;
Expand Down Expand Up @@ -550,7 +551,7 @@ C.sendMessage = function(channel,
var allLen = methodHeaderLen + bodyLen;

if (allLen < SINGLE_CHUNK_THRESHOLD) {
var all = new Buffer(allLen);
var all = Buffer.alloc(allLen);
var offset = mframe.copy(all, 0);
offset += pframe.copy(all, offset);

Expand All @@ -560,7 +561,7 @@ C.sendMessage = function(channel,
}
else {
if (methodHeaderLen < SINGLE_CHUNK_THRESHOLD) {
var both = new Buffer(methodHeaderLen);
var both = Buffer.alloc(methodHeaderLen);
var offset = mframe.copy(both, 0);
pframe.copy(both, offset);
buffer.write(both);
Expand Down
5 changes: 3 additions & 2 deletions lib/credentials.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
// * PLAIN (send username and password in the plain)
// * EXTERNAL (assume the server will figure out who you are from
// context, i.e., your SSL certificate)
var Buffer = require('safe-buffer').Buffer

module.exports.plain = function(user, passwd) {
return {
mechanism: 'PLAIN',
response: function() {
return new Buffer(['', user, passwd].join(String.fromCharCode(0)))
return Buffer.from(['', user, passwd].join(String.fromCharCode(0)))
},
username: user,
password: passwd
Expand All @@ -23,6 +24,6 @@ module.exports.plain = function(user, passwd) {
module.exports.external = function() {
return {
mechanism: 'EXTERNAL',
response: function() { return new Buffer(''); }
response: function() { return Buffer.from(''); }
}
}
3 changes: 2 additions & 1 deletion lib/frame.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
var defs = require('./defs');
var constants = defs.constants;
var decode = defs.decode;
var Buffer = require('safe-buffer').Buffer

var Bits = require('bitsyntax');

Expand Down Expand Up @@ -105,7 +106,7 @@ module.exports.decodeFrame = function(frame) {
}

// encoded heartbeat
module.exports.HEARTBEAT_BUF = new Buffer([constants.FRAME_HEARTBEAT,
module.exports.HEARTBEAT_BUF = Buffer.from([constants.FRAME_HEARTBEAT,
0, 0, 0, 0, // size = 0
0, 0, // channel = 0
constants.FRAME_END]);
Expand Down
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@
"url": "https://github.com/squaremo/amqp.node.git"
},
"engines": {
"node": ">=0.8 <6 || ^6"
"node": ">=0.8 <7.8"
},
"dependencies": {
"bitsyntax": "~0.0.4",
"bluebird": "^3.4.6",
"buffer-more-ints": "0.0.2",
"readable-stream": "1.x >=1.1.9",
"bluebird": "^3.4.6"
"safe-buffer": "^5.0.1"
},
"devDependencies": {
"mocha": "~1",
Expand Down
11 changes: 6 additions & 5 deletions test/callback_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ var schedule = util.schedule;
var randomString = util.randomString;
var kCallback = util.kCallback;
var domain = require('domain');
var Buffer = require('safe-buffer').Buffer;

var URL = process.env.URL || 'amqp://localhost';

Expand Down Expand Up @@ -159,7 +160,7 @@ channel_test('send to queue and consume noAck', function(ch, done) {
else done(new Error("message content doesn't match:" +
msg + " =/= " + m.content.toString()));
}, {noAck: true, exclusive: true});
ch.sendToQueue(q.queue, new Buffer(msg));
ch.sendToQueue(q.queue, Buffer.from(msg));
});
});

Expand All @@ -175,15 +176,15 @@ channel_test('send to queue and consume ack', function(ch, done) {
else done(new Error("message content doesn't match:" +
msg + " =/= " + m.content.toString()));
}, {noAck: false, exclusive: true});
ch.sendToQueue(q.queue, new Buffer(msg));
ch.sendToQueue(q.queue, Buffer.from(msg));
});
});

channel_test('send to and get from queue', function(ch, done) {
ch.assertQueue('', {exclusive: true}, function(e, q) {
if (e != null) return done(e);
var msg = randomString();
ch.sendToQueue(q.queue, new Buffer(msg));
ch.sendToQueue(q.queue, Buffer.from(msg));
waitForMessages(ch, q.queue, function(e, _) {
if (e != null) return done(e);
ch.get(q.queue, {noAck: true}, function(e, m) {
Expand All @@ -210,12 +211,12 @@ confirm_channel_test('Receive confirmation', function(ch, done) {
// An unroutable message, on the basis that you're not allowed a
// queue with an empty name, and you can't make bindings to the
// default exchange. Tricky eh?
ch.publish('', '', new Buffer('foo'), {}, done);
ch.publish('', '', Buffer.from('foo'), {}, done);
});

confirm_channel_test('Wait for confirms', function(ch, done) {
for (var i=0; i < 1000; i++) {
ch.publish('', '', new Buffer('foo'), {});
ch.publish('', '', Buffer.from('foo'), {});
}
ch.waitForConfirms(done);
});
Expand Down
Loading

0 comments on commit 31bb2a9

Please sign in to comment.