Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing pub-sub samples #503

Merged
merged 2 commits into from
Oct 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 38 additions & 20 deletions pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,31 @@ View the [documentation][topics_0_docs] or the [source code][topics_0_code].
__Usage:__ `node topics.js --help`

```
topics.js <command>

Commands:
list Lists all topics in the current project.
create <topicName> Creates a new topic.
delete <topicName> Deletes a topic.
publish <topicName> <message> Publishes a message to a topic.
publish-ordered <topicName> <message> Publishes an ordered message to a topic.
get-policy <topicName> Gets the IAM policy for a topic.
set-policy <topicName> Sets the IAM policy for a topic.
test-permissions <topicName> Tests the permissions for a topic.
topics.js list Lists all topics in the current project.
topics.js create <topicName> Creates a new topic.
topics.js delete <topicName> Deletes a topic.
topics.js publish <topicName> <message> Publishes a message to a topic.
topics.js publish-batch <topicName> <message> Publishes messages to a topic using custom batching settings.
topics.js publish-ordered <topicName> <message> Publishes an ordered message to a topic.
topics.js get-policy <topicName> Gets the IAM policy for a topic.
topics.js set-policy <topicName> Sets the IAM policy for a topic.
topics.js test-permissions <topicName> Tests the permissions for a topic.

Options:
--help Show help [boolean]
--version Show version number [boolean]
--help Show help [boolean]

Examples:
node topics.js list
node topics.js create my-topic
node topics.js delete my-topic
node topics.js publish my-topic "Hello, world!"
node topics.js publish my-topic '{"data":"Hello, world!"}'
node topics.js publish-ordered my-topic "Hello, world!"
node topics.js publish-batch my-topic "Hello, world!" -w 1000
node topics.js get-policy greetings
node topics.js set-policy greetings
node topics.js test-permissions greetings
Expand All @@ -75,27 +81,39 @@ View the [documentation][subscriptions_1_docs] or the [source code][subscription
__Usage:__ `node subscriptions.js --help`

```
subscriptions.js <command>

Commands:
list [topicName] Lists all subscriptions in the current project, optionally filtering by a
topic.
create <topicName> <subscriptionName> Creates a new subscription.
create-push <topicName> <subscriptionName> Creates a new push subscription.
delete <subscriptionName> Deletes a subscription.
get <subscriptionName> Gets the metadata for a subscription.
listen <subscriptionName> Listens to messages for a subscription.
get-policy <subscriptionName> Gets the IAM policy for a subscription.
set-policy <subscriptionName> Sets the IAM policy for a subscription.
test-permissions <subscriptionName> Tests the permissions for a subscription.
subscriptions.js list [topicName] Lists all subscriptions in the current project,
optionally filtering by a topic.
subscriptions.js create <topicName> <subscriptionName> Creates a new subscription.
subscriptions.js create-flow <topicName> <subscriptionName> Creates a new subscription with flow-control limits,
which don't persist between subscriptions.
subscriptions.js create-push <topicName> <subscriptionName> Creates a new push subscription.
subscriptions.js modify-config <topicName> Modifies the configuration of an existing push
<subscriptionName> subscription.
subscriptions.js delete <subscriptionName> Deletes a subscription.
subscriptions.js get <subscriptionName> Gets the metadata for a subscription.
subscriptions.js listen-messages <subscriptionName> Listens to messages for a subscription.
subscriptions.js listen-errors <subscriptionName> Listens to messages and errors for a subscription.
subscriptions.js get-policy <subscriptionName> Gets the IAM policy for a subscription.
subscriptions.js set-policy <subscriptionName> Sets the IAM policy for a subscription.
subscriptions.js test-permissions <subscriptionName> Tests the permissions for a subscription.

Options:
--help Show help [boolean]
--version Show version number [boolean]
--help Show help [boolean]

Examples:
node subscriptions.js list
node subscriptions.js list my-topic
node subscriptions.js create my-topic worker-1
node subscriptions.js create-flow my-topic worker-1 -m 5
node subscriptions.js create-push my-topic worker-1
node subscriptions.js modify-config my-topic worker-1
node subscriptions.js get worker-1
node subscriptions.js listen-messages my-subscription
node subscriptions.js listen-errors my-subscription
node subscriptions.js delete worker-1
node subscriptions.js pull worker-1
node subscriptions.js get-policy worker-1
Expand Down
8 changes: 4 additions & 4 deletions pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
"test": "samples test run --cmd ava -- -T 30s --verbose system-test/*.test.js"
},
"dependencies": {
"@google-cloud/pubsub": "0.14.0",
"yargs": "8.0.2"
"@google-cloud/pubsub": "0.14.5",
"yargs": "10.0.3"
},
"devDependencies": {
"@google-cloud/nodejs-repo-tools": "1.4.17",
"@google-cloud/nodejs-repo-tools": "2.0.11",
"ava": "0.22.0",
"proxyquire": "1.8.0",
"sinon": "3.2.1"
"sinon": "4.0.1"
},
"cloud-repo-tools": {
"requiresKeyFile": true,
Expand Down
125 changes: 124 additions & 1 deletion pubsub/subscriptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,32 @@ function createSubscription (topicName, subscriptionName) {
}
// [END pubsub_create_subscription]

// [START pubsub_subscriber_flow_settings]
function createFlowControlledSubscription (topicName, subscriptionName, maxInProgress, maxBytes) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the Pub/Sub samples were never updated to hide the method definition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct - I can address that in a separate PR, if you'd like.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes please

// Instantiates a client
const pubsub = PubSub();

// References an existing topic, e.g. "my-topic"
const topic = pubsub.topic(topicName);

// Creates a new subscription, e.g. "my-new-subscription"
// Note that flow control configurations are not persistent
return topic.createSubscription(subscriptionName, {
flowControl: {
maxBytes: maxBytes,
maxMessages: maxInProgress
}
})
.then((results) => {
const subscription = results[0];

console.log(`Subscription ${subscription.name} created with a maximum of ${maxInProgress} unprocessed messages.`);

return subscription;
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And weren't updated to handle errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct - see above comment.

}
// [END pubsub_subscriber_flow_settings]

// [START pubsub_create_push_subscription]
function createPushSubscription (topicName, subscriptionName) {
// Instantiates a client
Expand Down Expand Up @@ -112,6 +138,28 @@ function createPushSubscription (topicName, subscriptionName) {
}
// [END pubsub_create_push_subscription]

// [START pubsub_modify_push_config]
function modifyPushConfig (topicName, subscriptionName, pushEndpoint) {
// Instantiates a client
const pubsub = PubSub();

// References an existing topic and subscription, e.g. "my-topic" > "my-subscription"
const topic = pubsub.topic(topicName);
const subscription = topic.subscription(subscriptionName);

const options = {
// Set to an HTTPS endpoint of your choice. If necessary, register
// (authorize) the domain on which the server is hosted.
pushEndpoint: `https://${pubsub.projectId}.appspot.com/push`
};

return subscription.modifyPushConfig(options)
.then((results) => {
console.log(`Modified push config for subscription ${subscription.name}.`);
});
}
// [END pubsub_modify_push_config]

// [START pubsub_delete_subscription]
function deleteSubscription (subscriptionName) {
// Instantiates a client
Expand Down Expand Up @@ -246,6 +294,42 @@ function listenForOrderedMessages (subscriptionName, timeout) {
}
// [END pubsub_listen_ordered_messages]

// [START pubsub_listen_errors]
function listenForErrors (subscriptionName, timeout) {
// Instantiates a client
const pubsub = PubSub();

// References an existing subscription, e.g. "my-subscription"
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
const messageHandler = function (message) {
// Do something with the message
console.log(`Message: ${message}`);

// "Ack" (acknowledge receipt of) the message
message.ack();
};

// Create an event handler to handle errors
const errorHandler = function (error) {
// Do something with the error
console.error(`ERROR: ${error}`);
};

// Listen for new messages/errors until timeout is hit
return new Promise((resolve) => {
subscription.on(`message`, messageHandler);
subscription.on(`error`, errorHandler);
setTimeout(() => {
subscription.removeListener(`message`, messageHandler);
subscription.removeListener(`error`, errorHandler);
resolve();
}, timeout * 1000);
});
}
// [END pubsub_listen_errors]

// [START pubsub_get_subscription_policy]
function getSubscriptionPolicy (subscriptionName) {
// Instantiates a client
Expand Down Expand Up @@ -349,12 +433,35 @@ const cli = require(`yargs`)
{},
(opts) => createSubscription(opts.topicName, opts.subscriptionName)
)
.command(
`create-flow <topicName> <subscriptionName>`,
`Creates a new subscription with flow-control limits, which don't persist between subscriptions.`,
{
maxInProgress: {
alias: 'm',
type: 'number',
default: 0
},
maxBytes: {
alias: 'b',
type: 'number',
default: 0
}
},
(opts) => createFlowControlledSubscription(opts.topicName, opts.subscriptionName, opts.maxInProgress, opts.maxBytes)
)
.command(
`create-push <topicName> <subscriptionName>`,
`Creates a new push subscription.`,
{},
(opts) => createPushSubscription(opts.topicName, opts.subscriptionName)
)
.command(
`modify-config <topicName> <subscriptionName>`,
`Modifies the configuration of an existing push subscription.`,
{},
(opts) => modifyPushConfig(opts.topicName, opts.subscriptionName)
)
.command(
`delete <subscriptionName>`,
`Deletes a subscription.`,
Expand All @@ -368,7 +475,7 @@ const cli = require(`yargs`)
(opts) => getSubscription(opts.subscriptionName)
)
.command(
`listen <subscriptionName>`,
`listen-messages <subscriptionName>`,
`Listens to messages for a subscription.`,
{
timeout: {
Expand All @@ -379,6 +486,18 @@ const cli = require(`yargs`)
},
(opts) => listenForMessages(opts.subscriptionName, opts.timeout)
)
.command(
`listen-errors <subscriptionName>`,
`Listens to messages and errors for a subscription.`,
{
timeout: {
alias: 't',
type: 'number',
default: 10
}
},
(opts) => listenForErrors(opts.subscriptionName, opts.timeout)
)
.command(
`get-policy <subscriptionName>`,
`Gets the IAM policy for a subscription.`,
Expand All @@ -400,8 +519,12 @@ const cli = require(`yargs`)
.example(`node $0 list`)
.example(`node $0 list my-topic`)
.example(`node $0 create my-topic worker-1`)
.example(`node $0 create-flow my-topic worker-1 -m 5`)
.example(`node $0 create-push my-topic worker-1`)
.example(`node $0 modify-config my-topic worker-1`)
.example(`node $0 get worker-1`)
.example(`node $0 listen-messages my-subscription`)
.example(`node $0 listen-errors my-subscription`)
.example(`node $0 delete worker-1`)
.example(`node $0 pull worker-1`)
.example(`node $0 get-policy worker-1`)
Expand Down
28 changes: 25 additions & 3 deletions pubsub/system-test/subscriptions.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ const topicNameTwo = `nodejs-docs-samples-test-${uuid.v4()}`;
const subscriptionNameOne = `nodejs-docs-samples-test-sub-${uuid.v4()}`;
const subscriptionNameTwo = `nodejs-docs-samples-test-sub-${uuid.v4()}`;
const subscriptionNameThree = `nodejs-docs-samples-test-sub-${uuid.v4()}`;
const subscriptionNameFour = `nodejs-docs-samples-test-sub-${uuid.v4()}`;
const projectId = process.env.GCLOUD_PROJECT;
const fullTopicNameOne = `projects/${projectId}/topics/${topicNameOne}`;
const fullSubscriptionNameOne = `projects/${projectId}/subscriptions/${subscriptionNameOne}`;
const fullSubscriptionNameTwo = `projects/${projectId}/subscriptions/${subscriptionNameTwo}`;
const fullSubscriptionNameFour = `projects/${projectId}/subscriptions/${subscriptionNameFour}`;
const cmd = `node subscriptions.js`;

test.before(tools.checkCredentials);
Expand Down Expand Up @@ -81,6 +83,12 @@ test.serial(`should create a push subscription`, async (t) => {
}).start();
});

test.serial(`should modify the config of an existing push subscription`, async (t) => {
t.plan(1);
const output = await tools.runAsync(`${cmd} modify-config ${topicNameTwo} ${subscriptionNameTwo}`, cwd);
t.is(output, `Modified push config for subscription ${fullSubscriptionNameTwo}.`);
});

test.serial(`should get metadata for a subscription`, async (t) => {
const output = await tools.runAsync(`${cmd} get ${subscriptionNameOne}`, cwd);
const expected = `Subscription: ${fullSubscriptionNameOne}` +
Expand Down Expand Up @@ -111,9 +119,8 @@ test.serial(`should list subscriptions for a topic`, async (t) => {
});

test.serial(`should listen for messages`, async (t) => {
const expected = `Hello, world!`;
const messageIds = await pubsub.topic(topicNameOne).publisher().publish(Buffer.from(expected));
const output = await tools.runAsync(`${cmd} listen ${subscriptionNameOne}`, cwd);
const messageIds = await pubsub.topic(topicNameOne).publisher().publish(Buffer.from(`Hello, world!`));
const output = await tools.runAsync(`${cmd} listen-messages ${subscriptionNameOne}`, cwd);
t.true(output.includes(`Received message ${messageIds[0]}:`));
});

Expand Down Expand Up @@ -148,6 +155,11 @@ test.serial(`should listen for ordered messages`, async (t) => {
});
});

test.serial(`should listen for error messages`, async (t) => {
const output = await tools.runAsyncWithIO(`${cmd} listen-errors nonexistent-subscription -t 3`, cwd);
t.true(output.stderr.includes(`Resource not found`));
});

test.serial(`should set the IAM policy for a subscription`, async (t) => {
await tools.runAsync(`${cmd} set-policy ${subscriptionNameOne}`, cwd);
const results = await pubsub.subscription(subscriptionNameOne).iam.getPolicy();
Expand Down Expand Up @@ -185,3 +197,13 @@ test.serial(`should delete a subscription`, async (t) => {
assert(subscriptions.every((s) => s.name !== fullSubscriptionNameOne));
}).start();
});

test.serial(`should create a subscription with flow control`, async (t) => {
t.plan(1);
const output = await tools.runAsync(`${cmd} create-flow ${topicNameTwo} ${subscriptionNameFour} -m 5 -b 1024`, cwd);
t.is(output, `Subscription ${fullSubscriptionNameFour} created with a maximum of 5 unprocessed messages.`);
await tools.tryTest(async (assert) => {
const [subscriptions] = await pubsub.topic(topicNameTwo).getSubscriptions();
assert(subscriptions.some((s) => s.name === fullSubscriptionNameFour));
}).start();
});
16 changes: 16 additions & 0 deletions pubsub/system-test/topics.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const topicNameOne = `nodejs-docs-samples-test-${uuid.v4()}`;
const topicNameTwo = `nodejs-docs-samples-test-${uuid.v4()}`;
const subscriptionNameOne = `nodejs-docs-samples-test-${uuid.v4()}`;
const subscriptionNameTwo = `nodejs-docs-samples-test-${uuid.v4()}`;
const subscriptionNameThree = `nodejs-docs-samples-test-${uuid.v4()}`;
const projectId = process.env.GCLOUD_PROJECT;
const fullTopicNameOne = `projects/${projectId}/topics/${topicNameOne}`;
const expectedMessage = { data: `Hello, world!` };
Expand All @@ -48,6 +49,9 @@ test.after.always(async () => {
try {
await pubsub.subscription(subscriptionNameTwo).delete();
} catch (err) {} // ignore error
try {
await pubsub.subscription(subscriptionNameThree).delete();
} catch (err) {} // ignore error
try {
await pubsub.topic(topicNameTwo).delete();
} catch (err) {} // ignore error
Expand Down Expand Up @@ -131,6 +135,18 @@ test.serial(`should publish ordered messages`, async (t) => {
await topics.publishOrderedMessage(topicNameTwo, expectedMessage.data);
});

test.serial(`should publish with specific batch settings`, async (t) => {
t.plan(2);
const expectedWait = 1000;
const [subscription] = await pubsub.topic(topicNameOne).createSubscription(subscriptionNameThree);
const startTime = Date.now();
await tools.runAsync(`${cmd} publish-batch ${topicNameOne} "${expectedMessage.data}" -w ${expectedWait}`, cwd);
const receivedMessage = await _pullOneMessage(subscription);
const publishTime = Date.parse(receivedMessage.publishTime);
t.is(receivedMessage.data.toString(), expectedMessage.data);
t.true(publishTime - startTime > expectedWait);
});

test.serial(`should set the IAM policy for a topic`, async (t) => {
await tools.runAsync(`${cmd} set-policy ${topicNameOne}`, cwd);
const results = await pubsub.topic(topicNameOne).iam.getPolicy();
Expand Down
Loading