Skip to content

Commit

Permalink
test: add message status, return message IDs for produced messages
Browse files Browse the repository at this point in the history
  • Loading branch information
weyoss committed Nov 26, 2023
1 parent 56566bf commit 8e3c228
Show file tree
Hide file tree
Showing 22 changed files with 124 additions and 119 deletions.
17 changes: 8 additions & 9 deletions tests/common/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
*/

import { Consumer } from '../../src/lib/consumer/consumer';
import { Message } from '../../src/lib/message/message';
import { events } from '../../src/common/events/events';

// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand All @@ -27,27 +26,27 @@ export async function consumerOnEvent<T extends Array<any>>(

export async function untilMessageAcknowledged(
consumer: Consumer,
msg?: Message,
messageId?: string,
): Promise<void> {
const [message] = await consumerOnEvent<[Message]>(
const [id] = await consumerOnEvent<[string]>(
consumer,
events.MESSAGE_ACKNOWLEDGED,
);
if (msg && msg.getRequiredId() !== message.getRequiredId()) {
await untilMessageAcknowledged(consumer, msg);
if (messageId && messageId !== id) {
await untilMessageAcknowledged(consumer, messageId);
}
}

export async function untilMessageDeadLettered(
consumer: Consumer,
msg?: Message,
messageId?: string,
): Promise<void> {
const [message] = await consumerOnEvent<[Message]>(
const [, id] = await consumerOnEvent<[string, string]>(
consumer,
events.MESSAGE_DEAD_LETTERED,
);
if (msg && msg.getRequiredId() !== message.getRequiredId()) {
await untilMessageDeadLettered(consumer, msg);
if (messageId && messageId !== id) {
await untilMessageDeadLettered(consumer, messageId);
}
}

Expand Down
20 changes: 10 additions & 10 deletions tests/common/message-producing-consuming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ export async function produceAndAcknowledgeMessage(

const message = new Message();
message.setBody({ hello: 'world' }).setQueue(queue);
await producer.produceAsync(message);
const { messages } = await producer.produceAsync(message);

consumer.run();
await untilMessageAcknowledged(consumer);
return { producer, consumer, queue, message };
return { producer, consumer, queue, messageId: messages[0] };
}

export async function produceAndDeadLetterMessage(
Expand All @@ -60,11 +60,11 @@ export async function produceAndDeadLetterMessage(

const message = new Message();
message.setBody({ hello: 'world' }).setQueue(queue);
await producer.produceAsync(message);
const { messages } = await producer.produceAsync(message);

consumer.run();
await untilConsumerEvent(consumer, events.MESSAGE_DEAD_LETTERED);
return { producer, consumer, message, queue };
return { producer, consumer, messageId: messages[0], queue };
}

export async function produceMessage(queue: IQueueParams = defaultQueue) {
Expand All @@ -73,8 +73,8 @@ export async function produceMessage(queue: IQueueParams = defaultQueue) {

const message = new Message();
message.setBody({ hello: 'world' }).setQueue(queue);
await producer.produceAsync(message);
return { producer, message, queue };
const { messages } = await producer.produceAsync(message);
return { producer, messageId: messages[0], queue };
}

export async function produceMessageWithPriority(
Expand All @@ -85,8 +85,8 @@ export async function produceMessageWithPriority(

const message = new Message();
message.setPriority(Message.MessagePriority.LOW).setQueue(queue);
await producer.produceAsync(message);
return { message, producer, queue };
const { messages } = await producer.produceAsync(message);
return { messageId: messages[0], producer, queue };
}

export async function scheduleMessage(queue: IQueueParams = defaultQueue) {
Expand All @@ -95,8 +95,8 @@ export async function scheduleMessage(queue: IQueueParams = defaultQueue) {

const message = new Message();
message.setScheduledDelay(10000).setQueue(queue);
await producer.produceAsync(message);
return { message, producer, queue };
const { messages } = await producer.produceAsync(message);
return { messageId: messages[0], producer, queue };
}

export async function createQueue(
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/consuming-messages/test00002.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ test('Produce and consume 1 message', async () => {

consumer.run();

await untilMessageAcknowledged(consumer, msg);
await untilMessageAcknowledged(consumer, msg.getRequiredId());
});
14 changes: 5 additions & 9 deletions tests/tests/consuming-messages/test00016.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ test('Consume message from different queues using a single consumer instance: ca

const consumer = promisifyAll(new Consumer());
await consumer.consumeAsync('test_queue', (msg, cb) => {
cb();
setTimeout(cb, 1000);
});
await consumer.consumeAsync('another_queue', (msg, cb) => {
cb();
Expand All @@ -32,16 +32,12 @@ test('Consume message from different queues using a single consumer instance: ca
await producer.runAsync();

const msg1 = new Message().setQueue('test_queue').setBody('some data');
setTimeout(() => {
producer.produceAsync(msg1);
}, 1000);
await untilMessageAcknowledged(consumer, msg1);
const { messages } = await producer.produceAsync(msg1);
await untilMessageAcknowledged(consumer, messages[0]);

const msg2 = new Message().setQueue('another_queue').setBody('some data');
setTimeout(() => {
producer.produceAsync(msg2);
}, 1000);
await untilMessageAcknowledged(consumer, msg2);
const { messages: m } = await producer.produceAsync(msg2);
await untilMessageAcknowledged(consumer, m[0]);

await shutDownBaseInstance(consumer);
});
10 changes: 7 additions & 3 deletions tests/tests/consuming-messages/test00018.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@
import {
createQueue,
defaultQueue,
produceMessage,
} from '../../common/message-producing-consuming';
import { ProducerMessageAlreadyPublishedError } from '../../../src/lib/producer/errors';
import { Message } from '../../../src/lib/message/message';
import { getProducer } from '../../common/producer';

test('Producing duplicate message', async () => {
await createQueue(defaultQueue, false);
const { producer, message } = await produceMessage();
const m = new Message().setQueue(defaultQueue).setBody('123');
const p = await getProducer();
await p.runAsync();
await p.produceAsync(m);
await expect(async () => {
await producer.produceAsync(message);
await p.produceAsync(m);
}).rejects.toThrow(ProducerMessageAlreadyPublishedError);
});
6 changes: 3 additions & 3 deletions tests/tests/deleting-messages/test00001.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ import { getQueuePendingMessages } from '../../common/queue-pending-messages';
test('Combined test: Delete a pending message. Check pending message. Check queue metrics.', async () => {
await createQueue(defaultQueue, false);

const { queue, message } = await produceMessage();
const { queue, messageId } = await produceMessage();

const pendingMessages = await getQueuePendingMessages();
const res1 = await pendingMessages.getMessagesAsync(queue, 0, 100);

expect(res1.totalItems).toBe(1);
expect(res1.items[0].getId()).toBe(message.getRequiredId());
expect(res1.items[0].getId()).toBe(messageId);

const count = await pendingMessages.countMessagesAsync(queue);
expect(count).toBe(1);

await pendingMessages.deleteMessageAsync(queue, message.getRequiredId());
await pendingMessages.deleteMessageAsync(queue, messageId);

const res2 = await pendingMessages.getMessagesAsync(queue, 0, 100);
expect(res2.totalItems).toBe(0);
Expand Down
6 changes: 3 additions & 3 deletions tests/tests/deleting-messages/test00002.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ import { getQueueMessages } from '../../common/queue-messages';

test('Combined test: Delete a pending message with priority. Check pending message. Check queue metrics.', async () => {
await createQueue(defaultQueue, true);
const { message, queue } = await produceMessageWithPriority();
const { messageId, queue } = await produceMessageWithPriority();
const pendingMessages = await getQueuePendingMessages();
const res1 = await pendingMessages.getMessagesAsync(queue, 0, 100);

expect(res1.totalItems).toBe(1);
expect(res1.items[0].getId()).toBe(message.getRequiredId());
expect(res1.items[0].getId()).toBe(messageId);

const queueMessages = await getQueueMessages();
const count = await queueMessages.countMessagesByStatusAsync(queue);
expect(count.pending).toBe(1);

await pendingMessages.deleteMessageAsync(queue, message.getRequiredId());
await pendingMessages.deleteMessageAsync(queue, messageId);
const res2 = await pendingMessages.getMessagesAsync(queue, 0, 100);
expect(res2.totalItems).toBe(0);
expect(res2.items.length).toBe(0);
Expand Down
11 changes: 4 additions & 7 deletions tests/tests/deleting-messages/test00003.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { QueueMessageNotFoundError } from '../../../src/lib/queue/errors';

test('Combined test: Delete an acknowledged message. Check pending, acknowledged, and dead-letter message. Check queue metrics.', async () => {
await createQueue(defaultQueue, false);
const { queue, message } = await produceAndAcknowledgeMessage();
const { queue, messageId } = await produceAndAcknowledgeMessage();
const deadLetteredMessages = await getQueueDeadLetteredMessages();
const res0 = await deadLetteredMessages.countMessagesAsync(queue);
expect(res0).toBe(0);
Expand All @@ -37,14 +37,14 @@ test('Combined test: Delete an acknowledged message. Check pending, acknowledged
const res3 = await acknowledgedMessages.getMessagesAsync(queue, 0, 100);
expect(res3.totalItems).toBe(1);
expect(res3.items.length).toBe(1);
expect(res3.items[0]).toEqual(message);
expect(res3.items[0].getRequiredId()).toEqual(messageId);

const queueMessages = await getQueueMessages();
const count = await queueMessages.countMessagesByStatusAsync(queue);
expect(count.pending).toBe(0);
expect(count.acknowledged).toBe(1);

await acknowledgedMessages.deleteMessageAsync(queue, message.getRequiredId());
await acknowledgedMessages.deleteMessageAsync(queue, messageId);

const res4 = await acknowledgedMessages.getMessagesAsync(queue, 0, 100);
expect(res4.totalItems).toBe(0);
Expand All @@ -64,9 +64,6 @@ test('Combined test: Delete an acknowledged message. Check pending, acknowledged
expect(count1.deadLettered).toBe(0);

await expect(async () => {
await acknowledgedMessages.deleteMessageAsync(
queue,
message.getRequiredId(),
);
await acknowledgedMessages.deleteMessageAsync(queue, messageId);
}).rejects.toThrow(QueueMessageNotFoundError);
});
11 changes: 4 additions & 7 deletions tests/tests/deleting-messages/test00004.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { QueueMessageNotFoundError } from '../../../src/lib/queue/errors';

test('Combined test: Delete a dead-letter message. Check pending, acknowledged, and dead-letter message. Check queue metrics.', async () => {
await createQueue(defaultQueue, false);
const { queue, message } = await produceAndDeadLetterMessage();
const { queue, messageId } = await produceAndDeadLetterMessage();

const deadLetteredMessages = await getQueueDeadLetteredMessages();
const res0 = await deadLetteredMessages.countMessagesAsync(queue);
Expand All @@ -38,15 +38,15 @@ test('Combined test: Delete a dead-letter message. Check pending, acknowledged,
const res3 = await deadLetteredMessages.getMessagesAsync(queue, 0, 100);
expect(res3.totalItems).toBe(1);
expect(res3.items.length).toBe(1);
expect(res3.items[0].getId()).toEqual(message.getRequiredId());
expect(res3.items[0].getId()).toEqual(messageId);

const queueMessages = await getQueueMessages();
const count = await queueMessages.countMessagesByStatusAsync(queue);
expect(count.pending).toBe(0);
expect(count.acknowledged).toBe(0);
expect(count.deadLettered).toBe(1);

await deadLetteredMessages.deleteMessageAsync(queue, message.getRequiredId());
await deadLetteredMessages.deleteMessageAsync(queue, messageId);

const res4 = await acknowledgedMessages.getMessagesAsync(queue, 0, 100);
expect(res4.totalItems).toBe(0);
Expand All @@ -66,9 +66,6 @@ test('Combined test: Delete a dead-letter message. Check pending, acknowledged,
expect(count1.deadLettered).toBe(0);

await expect(async () => {
await deadLetteredMessages.deleteMessageAsync(
queue,
message.getRequiredId(),
);
await deadLetteredMessages.deleteMessageAsync(queue, messageId);
}).rejects.toThrow(QueueMessageNotFoundError);
});
Loading

0 comments on commit 8e3c228

Please sign in to comment.