-
Notifications
You must be signed in to change notification settings - Fork 2k
/
topics.test.js
169 lines (148 loc) · 6.07 KB
/
topics.test.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
/**
* Copyright 2017, Google, Inc.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
const path = require(`path`);
const pubsub = require(`@google-cloud/pubsub`)();
const test = require(`ava`);
const tools = require(`@google-cloud/nodejs-repo-tools`);
const uuid = require(`uuid`);
const cwd = path.join(__dirname, `..`);
const topicNameOne = `nodejs-docs-samples-test-${uuid.v4()}`;
const topicNameTwo = `nodejs-docs-samples-test-${uuid.v4()}`;
const subscriptionNameOne = `nodejs-docs-samples-test-${uuid.v4()}`;
const subscriptionNameTwo = `nodejs-docs-samples-test-${uuid.v4()}`;
const projectId = process.env.GCLOUD_PROJECT;
const fullTopicNameOne = `projects/${projectId}/topics/${topicNameOne}`;
const expectedMessage = { data: `Hello, world!` };
const cmd = `node topics.js`;
test.before(tools.checkCredentials);
test.before(async () => {
try {
await pubsub.createTopic(topicNameTwo);
} catch (err) {} // ignore error
});
test.after.always(async () => {
try {
await pubsub.subscription(subscriptionNameOne).delete();
} catch (err) {} // ignore error
try {
await pubsub.topic(topicNameOne).delete();
} catch (err) {} // ignore error
try {
await pubsub.subscription(subscriptionNameTwo).delete();
} catch (err) {} // ignore error
try {
await pubsub.topic(topicNameTwo).delete();
} catch (err) {} // ignore error
});
// Helper function to pull one message
const _pullOneMessage = (subscriptionObj, timeout) => {
timeout = timeout || 10000; // 10 second timeout by default
let message;
return new Promise((resolve, reject) => {
// First message received; ack it + resolve promise
const messageHandler = (received) => {
received.ack();
message = received;
return resolve(messageHandler);
};
// Listen for new messages
subscriptionObj.on(`message`, messageHandler);
// Timeout appropriately
setTimeout(() => {
return reject(new Error(`_pullOneMessage timed out`));
}, timeout);
}).then((messageHandler) => {
subscriptionObj.removeListener('message', messageHandler);
return Promise.resolve(message);
});
};
test.serial(`should create a topic`, async (t) => {
t.plan(1);
const output = await tools.runAsync(`${cmd} create ${topicNameOne}`, cwd);
t.is(output, `Topic ${fullTopicNameOne} created.`);
await tools.tryTest(async (assert) => {
const [topics] = await pubsub.getTopics();
assert(topics.some((s) => s.name === fullTopicNameOne));
}).start();
});
test.serial(`should list topics`, async (t) => {
await tools.tryTest(async () => {
const output = await tools.runAsync(`${cmd} list`, cwd);
t.true(output.includes(`Topics:`));
t.true(output.includes(fullTopicNameOne));
}).start();
});
test.serial(`should publish a simple message`, async (t) => {
t.plan(1);
const [subscription] = await pubsub.topic(topicNameOne).createSubscription(subscriptionNameOne);
await tools.runAsync(`${cmd} publish ${topicNameOne} "${expectedMessage.data}"`, cwd);
const receivedMessage = await _pullOneMessage(subscription);
t.is(receivedMessage.data.toString(), expectedMessage.data);
});
test.serial(`should publish a JSON message`, async (t) => {
const [subscription] = await pubsub.topic(topicNameOne).createSubscription(subscriptionNameOne);
await tools.runAsync(`${cmd} publish ${topicNameOne} '${JSON.stringify(expectedMessage)}'`, cwd);
const receivedMessage = await _pullOneMessage(subscription);
t.deepEqual(JSON.parse(receivedMessage.data.toString()), expectedMessage);
});
test.serial(`should publish ordered messages`, async (t) => {
const topics = require(`../topics`);
const [subscription] = await pubsub.topic(topicNameTwo).createSubscription(subscriptionNameTwo);
let messageId = await topics.publishOrderedMessage(topicNameTwo, expectedMessage.data);
let message = await _pullOneMessage(subscription);
t.is(message.id, messageId);
t.is(message.data.toString(), expectedMessage.data);
t.is(message.attributes.counterId, '1');
messageId = await topics.publishOrderedMessage(topicNameTwo, expectedMessage.data);
message = await _pullOneMessage(subscription);
t.is(message.id, messageId);
t.is(message.data.toString(), expectedMessage.data);
t.is(message.attributes.counterId, '2');
await topics.publishOrderedMessage(topicNameTwo, expectedMessage.data);
});
test.serial(`should set the IAM policy for a topic`, async (t) => {
await tools.runAsync(`${cmd} set-policy ${topicNameOne}`, cwd);
const results = await pubsub.topic(topicNameOne).iam.getPolicy();
const [policy] = results;
t.deepEqual(policy.bindings, [
{
role: `roles/pubsub.editor`,
members: [`group:cloud-logs@google.com`]
},
{
role: `roles/pubsub.viewer`,
members: [`allUsers`]
}
]);
});
test.serial(`should get the IAM policy for a topic`, async (t) => {
const [policy] = await pubsub.topic(topicNameOne).iam.getPolicy();
const output = await tools.runAsync(`${cmd} get-policy ${topicNameOne}`, cwd);
t.is(output, `Policy for topic: ${JSON.stringify(policy.bindings)}.`);
});
test.serial(`should test permissions for a topic`, async (t) => {
const output = await tools.runAsync(`${cmd} test-permissions ${topicNameOne}`, cwd);
t.true(output.includes(`Tested permissions for topic`));
});
test.serial(`should delete a topic`, async (t) => {
t.plan(1);
const output = await tools.runAsync(`${cmd} delete ${topicNameOne}`, cwd);
t.is(output, `Topic ${fullTopicNameOne} deleted.`);
await tools.tryTest(async (assert) => {
const [topics] = await pubsub.getTopics();
assert(topics.every((s) => s.name !== fullTopicNameOne));
}).start();
});