Skip to content

Commit

Permalink
Address comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdobry committed Sep 28, 2016
1 parent d883ec2 commit d326b8b
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 90 deletions.
1 change: 1 addition & 0 deletions pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"yargs": "^5.0.0"
},
"devDependencies": {
"async": "^2.0.1",
"mocha": "^3.0.2",
"node-uuid": "^1.4.7"
},
Expand Down
20 changes: 11 additions & 9 deletions pubsub/subscriptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -216,28 +216,30 @@ function pullOrderedMessages (subscriptionName, callback) {
return;
}

// Pub/Sub messages are unordered, so here we manually order messages by
// their "counterId" attribute which was set when they were published.
messages.forEach((message) => {
outstandingMessages[message.attributes.orderId] = message;
outstandingMessages[message.attributes.counterId] = message;
});

const outstandingIds = Object.keys(outstandingMessages).map((orderId) => +orderId);
const outstandingIds = Object.keys(outstandingMessages).map((counterId) => +counterId);
outstandingIds.sort();

outstandingIds.forEach((orderId) => {
outstandingIds.forEach((counterId) => {
const counter = getSubscribeCounterValue();
const message = outstandingMessages[orderId];
const message = outstandingMessages[counterId];

if (orderId < counter) {
if (counterId < counter) {
// The message has already been processed
subscription.ack(message.ackId);
delete outstandingMessages[orderId];
} else if (orderId === counter) {
delete outstandingMessages[counterId];
} else if (counterId === counter) {
// Process the message
console.log(`* %d %j %j`, message.id, message.data, message.attributes);

setSubscribeCounterValue(orderId + 1);
setSubscribeCounterValue(counterId + 1);
subscription.ack(message.ackId);
delete outstandingMessages[orderId];
delete outstandingMessages[counterId];
} else {
// Have not yet processed the message on which this message is dependent
return false;
Expand Down
73 changes: 39 additions & 34 deletions pubsub/system-test/subscriptions.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

'use strict';

const async = require(`async`);
const pubsub = require(`@google-cloud/pubsub`)();
const uuid = require(`node-uuid`);
const path = require(`path`);
Expand Down Expand Up @@ -115,40 +116,44 @@ describe(`pubsub:subscriptions`, () => {
it(`should pull ordered messages`, (done) => {
const subscriptions = require('../subscriptions');
const expected = `Hello, world!`;
pubsub.topic(topicName).publish({ data: expected, attributes: { orderId: '3' } }, (err, firstMessageIds) => {
assert.ifError(err);
setTimeout(() => {
subscriptions.pullOrderedMessages(subscriptionNameOne, (err) => {
assert.ifError(err);
assert.equal(console.log.callCount, 0);
pubsub.topic(topicName).publish({ data: expected, attributes: { orderId: '1' } }, (err, secondMessageIds) => {
assert.ifError(err);
setTimeout(() => {
subscriptions.pullOrderedMessages(subscriptionNameOne, (err) => {
assert.ifError(err);
assert.equal(console.log.callCount, 1);
assert.deepEqual(console.log.firstCall.args, [`* %d %j %j`, secondMessageIds[0], expected, { orderId: '1' }]);
pubsub.topic(topicName).publish({ data: expected, attributes: { orderId: '1' } }, (err) => {
assert.ifError(err);
pubsub.topic(topicName).publish({ data: expected, attributes: { orderId: '2' } }, (err, thirdMessageIds) => {
assert.ifError(err);
setTimeout(() => {
subscriptions.pullOrderedMessages(subscriptionNameOne, (err) => {
assert.ifError(err);
assert.equal(console.log.callCount, 3);
assert.deepEqual(console.log.secondCall.args, [`* %d %j %j`, thirdMessageIds[0], expected, { orderId: '2' }]);
assert.deepEqual(console.log.thirdCall.args, [`* %d %j %j`, firstMessageIds[0], expected, { orderId: '3' }]);
done();
});
}, 2000);
});
});
});
}, 2000);
});
});
}, 2000);
});
const publishedMessageIds = [];
let firstMessageIds, secondMessageIds, thirdMessageIds;

async.waterfall([
(cb) => {
pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '3' } }, cb)
},
(messageIds, apiResponse, cb) => {
publishedMessageIds.push(messageIds[0]);
setTimeout(() => subscriptions.pullOrderedMessages(subscriptionNameOne, cb), 2000);
},
(cb) => {
assert.equal(console.log.callCount, 0);
pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '1' } }, cb);
},
(messageIds, apiResponse, cb) => {
publishedMessageIds.push(messageIds[0]);
setTimeout(() => subscriptions.pullOrderedMessages(subscriptionNameOne, cb), 2000);
},
(cb) => {
assert.equal(console.log.callCount, 1);
assert.deepEqual(console.log.firstCall.args, [`* %d %j %j`, publishedMessageIds[1], expected, { counterId: '1' }]);
pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '1' } }, cb);
},
(messageIds, apiResponse, cb) => {
pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '2' } }, cb);
},
(messageIds, apiResponse, cb) => {
publishedMessageIds.push(messageIds[0]);
setTimeout(() => subscriptions.pullOrderedMessages(subscriptionNameOne, cb), 2000);
},
(cb) => {
assert.equal(console.log.callCount, 3);
assert.deepEqual(console.log.secondCall.args, [`* %d %j %j`, publishedMessageIds[2], expected, { counterId: '2' }]);
assert.deepEqual(console.log.thirdCall.args, [`* %d %j %j`, publishedMessageIds[0], expected, { counterId: '3' }]);
cb();
}
], done);
});

it(`should set the IAM policy for a subscription`, (done) => {
Expand Down
98 changes: 54 additions & 44 deletions pubsub/system-test/topics.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

'use strict';

const async = require(`async`);
const pubsub = require(`@google-cloud/pubsub`)();
const uuid = require(`node-uuid`);
const path = require(`path`);
Expand Down Expand Up @@ -58,57 +59,66 @@ describe(`pubsub:topics`, () => {
});

it(`should publish a simple message`, (done) => {
pubsub.topic(topicName).subscribe(subscriptionName, (err, subscription) => {
assert.ifError(err);
run(`${cmd} publish ${topicName} "${message.data}"`, cwd);
setTimeout(() => {
subscription.pull((err, messages) => {
assert.ifError(err);
assert.equal(messages[0].data, message.data);
done();
});
}, 2000);
});
async.waterfall([
(cb) => {
pubsub.topic(topicName).subscribe(subscriptionName, cb);
},
(subscription, apiResponse, cb) => {
run(`${cmd} publish ${topicName} "${message.data}"`, cwd);
setTimeout(() => subscription.pull(cb), 2000);
},
(messages, apiResponse, cb) => {
assert.equal(messages[0].data, message.data);
cb();
}
], done);
});

it(`should publish a JSON message`, (done) => {
pubsub.topic(topicName).subscribe(subscriptionName, { reuseExisting: true }, (err, subscription) => {
assert.ifError(err);
run(`${cmd} publish ${topicName} '${JSON.stringify(message)}'`, cwd);
setTimeout(() => {
subscription.pull((err, messages) => {
assert.ifError(err);
assert.deepEqual(messages[0].data, message);
done();
});
}, 2000);
});
async.waterfall([
(cb) => {
pubsub.topic(topicName).subscribe(subscriptionName, { reuseExisting: true }, cb);
},
(subscription, apiResponse, cb) => {
run(`${cmd} publish ${topicName} '${JSON.stringify(message)}'`, cwd);
setTimeout(() => subscription.pull(cb), 2000);
},
(messages, apiResponse, cb) => {
assert.deepEqual(messages[0].data, message);
cb();
}
], done);
});

it(`should publish ordered messages`, (done) => {
const topics = require('../topics');
pubsub.topic(topicName).subscribe(subscriptionName, { reuseExisting: true }, (err, subscription) => {
assert.ifError(err);
topics.publishOrderedMessage(topicName, message.data, () => {
setTimeout(() => {
subscription.pull((err, messages) => {
assert.ifError(err);
assert.equal(messages[0].data, message.data);
assert.equal(messages[0].attributes.orderId, '1');
topics.publishOrderedMessage(topicName, message.data, () => {
setTimeout(() => {
subscription.pull((err, messages) => {
assert.ifError(err);
assert.equal(messages[0].data, message.data);
assert.equal(messages[0].attributes.orderId, '2');
done();
});
}, 2000);
});
});
}, 2000);
});
});
let subscription;

async.waterfall([
(cb) => {
pubsub.topic(topicName).subscribe(subscriptionName, { reuseExisting: true }, cb);
},
(_subscription, apiResponse, cb) => {
subscription = _subscription;
topics.publishOrderedMessage(topicName, message.data, cb);
},
(cb) => {
setTimeout(() => subscription.pull(cb), 2000);
},
(messages, apiResponse, cb) => {
assert.equal(messages[0].data, message.data);
assert.equal(messages[0].attributes.counterId, '1');
topics.publishOrderedMessage(topicName, message.data, cb);
},
(cb) => {
setTimeout(() => subscription.pull(cb), 2000);
},
(messages, apiResponse, cb) => {
assert.equal(messages[0].data, message.data);
assert.equal(messages[0].attributes.counterId, '2');
topics.publishOrderedMessage(topicName, message.data, cb);
}
], done);
});

it(`should set the IAM policy for a topic`, (done) => {
Expand Down
7 changes: 4 additions & 3 deletions pubsub/topics.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,10 @@ function publishOrderedMessage (topicName, data, callback) {
const message = {
data: data,

// Assign an order id to the message
// Pub/Sub messages are unordered, so assign an order id to the message to
// manually order messages
attributes: {
orderId: '' + getPublishCounterValue()
counterId: '' + getPublishCounterValue()
}
};

Expand All @@ -160,7 +161,7 @@ function publishOrderedMessage (topicName, data, callback) {
}

// Update the counter value
setPublishCounterValue(+message.attributes.orderId + 1);
setPublishCounterValue(+message.attributes.counterId + 1);

console.log(`Message ${messageIds[0]} published.`);
callback();
Expand Down

0 comments on commit d326b8b

Please sign in to comment.