From 8e3c22854940e7de3ab911e99cba64793b6ef7a5 Mon Sep 17 00:00:00 2001 From: Weyoss Date: Sun, 26 Nov 2023 01:13:46 +0100 Subject: [PATCH] test: add message status, return message IDs for produced messages --- tests/common/events.ts | 17 +++--- tests/common/message-producing-consuming.ts | 20 +++--- .../consuming-messages/test00002.test.ts | 2 +- .../consuming-messages/test00016.test.ts | 14 ++--- .../consuming-messages/test00018.test.ts | 10 ++- .../tests/deleting-messages/test00001.test.ts | 6 +- .../tests/deleting-messages/test00002.test.ts | 6 +- .../tests/deleting-messages/test00003.test.ts | 11 ++-- .../tests/deleting-messages/test00004.test.ts | 11 ++-- tests/tests/event-listeners/test00001.test.ts | 61 +++++++++++-------- tests/tests/event-listeners/test00002.test.ts | 15 ++--- .../fanout-exchange/test00003.test.ts | 5 +- .../fanout-exchange/test00004.test.ts | 5 +- .../topic-exchange/test00004.test.ts | 5 +- .../topic-exchange/test00005.test.ts | 5 +- .../test00001.test.ts | 2 +- .../test00001.test.ts | 2 +- .../tests/queue-rate-limit/test00029.test.ts | 6 +- .../tests/queue-rate-limit/test00030.test.ts | 6 +- .../tests/queue-rate-limit/test00031.test.ts | 6 +- .../requeuing-messages/test00001.test.ts | 14 ++--- .../requeuing-messages/test00002.test.ts | 14 ++--- 22 files changed, 124 insertions(+), 119 deletions(-) diff --git a/tests/common/events.ts b/tests/common/events.ts index c9946485..57f79b3c 100644 --- a/tests/common/events.ts +++ b/tests/common/events.ts @@ -8,7 +8,6 @@ */ import { Consumer } from '../../src/lib/consumer/consumer'; -import { Message } from '../../src/lib/message/message'; import { events } from '../../src/common/events/events'; // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -27,27 +26,27 @@ export async function consumerOnEvent>( export async function untilMessageAcknowledged( consumer: Consumer, - msg?: Message, + messageId?: string, ): Promise { - const [message] = await consumerOnEvent<[Message]>( + const [id] = await consumerOnEvent<[string]>( consumer, events.MESSAGE_ACKNOWLEDGED, ); - if (msg && msg.getRequiredId() !== message.getRequiredId()) { - await untilMessageAcknowledged(consumer, msg); + if (messageId && messageId !== id) { + await untilMessageAcknowledged(consumer, messageId); } } export async function untilMessageDeadLettered( consumer: Consumer, - msg?: Message, + messageId?: string, ): Promise { - const [message] = await consumerOnEvent<[Message]>( + const [, id] = await consumerOnEvent<[string, string]>( consumer, events.MESSAGE_DEAD_LETTERED, ); - if (msg && msg.getRequiredId() !== message.getRequiredId()) { - await untilMessageDeadLettered(consumer, msg); + if (messageId && messageId !== id) { + await untilMessageDeadLettered(consumer, messageId); } } diff --git a/tests/common/message-producing-consuming.ts b/tests/common/message-producing-consuming.ts index a6f91578..1b3be7d8 100644 --- a/tests/common/message-producing-consuming.ts +++ b/tests/common/message-producing-consuming.ts @@ -38,11 +38,11 @@ export async function produceAndAcknowledgeMessage( const message = new Message(); message.setBody({ hello: 'world' }).setQueue(queue); - await producer.produceAsync(message); + const { messages } = await producer.produceAsync(message); consumer.run(); await untilMessageAcknowledged(consumer); - return { producer, consumer, queue, message }; + return { producer, consumer, queue, messageId: messages[0] }; } export async function produceAndDeadLetterMessage( @@ -60,11 +60,11 @@ export async function produceAndDeadLetterMessage( const message = new Message(); message.setBody({ hello: 'world' }).setQueue(queue); - await producer.produceAsync(message); + const { messages } = await producer.produceAsync(message); consumer.run(); await untilConsumerEvent(consumer, events.MESSAGE_DEAD_LETTERED); - return { producer, consumer, message, queue }; + return { producer, consumer, messageId: messages[0], queue }; } export async function produceMessage(queue: IQueueParams = defaultQueue) { @@ -73,8 +73,8 @@ export async function produceMessage(queue: IQueueParams = defaultQueue) { const message = new Message(); message.setBody({ hello: 'world' }).setQueue(queue); - await producer.produceAsync(message); - return { producer, message, queue }; + const { messages } = await producer.produceAsync(message); + return { producer, messageId: messages[0], queue }; } export async function produceMessageWithPriority( @@ -85,8 +85,8 @@ export async function produceMessageWithPriority( const message = new Message(); message.setPriority(Message.MessagePriority.LOW).setQueue(queue); - await producer.produceAsync(message); - return { message, producer, queue }; + const { messages } = await producer.produceAsync(message); + return { messageId: messages[0], producer, queue }; } export async function scheduleMessage(queue: IQueueParams = defaultQueue) { @@ -95,8 +95,8 @@ export async function scheduleMessage(queue: IQueueParams = defaultQueue) { const message = new Message(); message.setScheduledDelay(10000).setQueue(queue); - await producer.produceAsync(message); - return { message, producer, queue }; + const { messages } = await producer.produceAsync(message); + return { messageId: messages[0], producer, queue }; } export async function createQueue( diff --git a/tests/tests/consuming-messages/test00002.test.ts b/tests/tests/consuming-messages/test00002.test.ts index cba84a67..d763ead9 100644 --- a/tests/tests/consuming-messages/test00002.test.ts +++ b/tests/tests/consuming-messages/test00002.test.ts @@ -39,5 +39,5 @@ test('Produce and consume 1 message', async () => { consumer.run(); - await untilMessageAcknowledged(consumer, msg); + await untilMessageAcknowledged(consumer, msg.getRequiredId()); }); diff --git a/tests/tests/consuming-messages/test00016.test.ts b/tests/tests/consuming-messages/test00016.test.ts index 5a4ae69b..8f681563 100644 --- a/tests/tests/consuming-messages/test00016.test.ts +++ b/tests/tests/consuming-messages/test00016.test.ts @@ -21,7 +21,7 @@ test('Consume message from different queues using a single consumer instance: ca const consumer = promisifyAll(new Consumer()); await consumer.consumeAsync('test_queue', (msg, cb) => { - cb(); + setTimeout(cb, 1000); }); await consumer.consumeAsync('another_queue', (msg, cb) => { cb(); @@ -32,16 +32,12 @@ test('Consume message from different queues using a single consumer instance: ca await producer.runAsync(); const msg1 = new Message().setQueue('test_queue').setBody('some data'); - setTimeout(() => { - producer.produceAsync(msg1); - }, 1000); - await untilMessageAcknowledged(consumer, msg1); + const { messages } = await producer.produceAsync(msg1); + await untilMessageAcknowledged(consumer, messages[0]); const msg2 = new Message().setQueue('another_queue').setBody('some data'); - setTimeout(() => { - producer.produceAsync(msg2); - }, 1000); - await untilMessageAcknowledged(consumer, msg2); + const { messages: m } = await producer.produceAsync(msg2); + await untilMessageAcknowledged(consumer, m[0]); await shutDownBaseInstance(consumer); }); diff --git a/tests/tests/consuming-messages/test00018.test.ts b/tests/tests/consuming-messages/test00018.test.ts index 531a75f1..822498b4 100644 --- a/tests/tests/consuming-messages/test00018.test.ts +++ b/tests/tests/consuming-messages/test00018.test.ts @@ -10,14 +10,18 @@ import { createQueue, defaultQueue, - produceMessage, } from '../../common/message-producing-consuming'; import { ProducerMessageAlreadyPublishedError } from '../../../src/lib/producer/errors'; +import { Message } from '../../../src/lib/message/message'; +import { getProducer } from '../../common/producer'; test('Producing duplicate message', async () => { await createQueue(defaultQueue, false); - const { producer, message } = await produceMessage(); + const m = new Message().setQueue(defaultQueue).setBody('123'); + const p = await getProducer(); + await p.runAsync(); + await p.produceAsync(m); await expect(async () => { - await producer.produceAsync(message); + await p.produceAsync(m); }).rejects.toThrow(ProducerMessageAlreadyPublishedError); }); diff --git a/tests/tests/deleting-messages/test00001.test.ts b/tests/tests/deleting-messages/test00001.test.ts index 5dde8c17..9c013a2f 100644 --- a/tests/tests/deleting-messages/test00001.test.ts +++ b/tests/tests/deleting-messages/test00001.test.ts @@ -17,18 +17,18 @@ import { getQueuePendingMessages } from '../../common/queue-pending-messages'; test('Combined test: Delete a pending message. Check pending message. Check queue metrics.', async () => { await createQueue(defaultQueue, false); - const { queue, message } = await produceMessage(); + const { queue, messageId } = await produceMessage(); const pendingMessages = await getQueuePendingMessages(); const res1 = await pendingMessages.getMessagesAsync(queue, 0, 100); expect(res1.totalItems).toBe(1); - expect(res1.items[0].getId()).toBe(message.getRequiredId()); + expect(res1.items[0].getId()).toBe(messageId); const count = await pendingMessages.countMessagesAsync(queue); expect(count).toBe(1); - await pendingMessages.deleteMessageAsync(queue, message.getRequiredId()); + await pendingMessages.deleteMessageAsync(queue, messageId); const res2 = await pendingMessages.getMessagesAsync(queue, 0, 100); expect(res2.totalItems).toBe(0); diff --git a/tests/tests/deleting-messages/test00002.test.ts b/tests/tests/deleting-messages/test00002.test.ts index 0c064183..cc6abecf 100644 --- a/tests/tests/deleting-messages/test00002.test.ts +++ b/tests/tests/deleting-messages/test00002.test.ts @@ -17,18 +17,18 @@ import { getQueueMessages } from '../../common/queue-messages'; test('Combined test: Delete a pending message with priority. Check pending message. Check queue metrics.', async () => { await createQueue(defaultQueue, true); - const { message, queue } = await produceMessageWithPriority(); + const { messageId, queue } = await produceMessageWithPriority(); const pendingMessages = await getQueuePendingMessages(); const res1 = await pendingMessages.getMessagesAsync(queue, 0, 100); expect(res1.totalItems).toBe(1); - expect(res1.items[0].getId()).toBe(message.getRequiredId()); + expect(res1.items[0].getId()).toBe(messageId); const queueMessages = await getQueueMessages(); const count = await queueMessages.countMessagesByStatusAsync(queue); expect(count.pending).toBe(1); - await pendingMessages.deleteMessageAsync(queue, message.getRequiredId()); + await pendingMessages.deleteMessageAsync(queue, messageId); const res2 = await pendingMessages.getMessagesAsync(queue, 0, 100); expect(res2.totalItems).toBe(0); expect(res2.items.length).toBe(0); diff --git a/tests/tests/deleting-messages/test00003.test.ts b/tests/tests/deleting-messages/test00003.test.ts index 51a193fc..4fd6a666 100644 --- a/tests/tests/deleting-messages/test00003.test.ts +++ b/tests/tests/deleting-messages/test00003.test.ts @@ -20,7 +20,7 @@ import { QueueMessageNotFoundError } from '../../../src/lib/queue/errors'; test('Combined test: Delete an acknowledged message. Check pending, acknowledged, and dead-letter message. Check queue metrics.', async () => { await createQueue(defaultQueue, false); - const { queue, message } = await produceAndAcknowledgeMessage(); + const { queue, messageId } = await produceAndAcknowledgeMessage(); const deadLetteredMessages = await getQueueDeadLetteredMessages(); const res0 = await deadLetteredMessages.countMessagesAsync(queue); expect(res0).toBe(0); @@ -37,14 +37,14 @@ test('Combined test: Delete an acknowledged message. Check pending, acknowledged const res3 = await acknowledgedMessages.getMessagesAsync(queue, 0, 100); expect(res3.totalItems).toBe(1); expect(res3.items.length).toBe(1); - expect(res3.items[0]).toEqual(message); + expect(res3.items[0].getRequiredId()).toEqual(messageId); const queueMessages = await getQueueMessages(); const count = await queueMessages.countMessagesByStatusAsync(queue); expect(count.pending).toBe(0); expect(count.acknowledged).toBe(1); - await acknowledgedMessages.deleteMessageAsync(queue, message.getRequiredId()); + await acknowledgedMessages.deleteMessageAsync(queue, messageId); const res4 = await acknowledgedMessages.getMessagesAsync(queue, 0, 100); expect(res4.totalItems).toBe(0); @@ -64,9 +64,6 @@ test('Combined test: Delete an acknowledged message. Check pending, acknowledged expect(count1.deadLettered).toBe(0); await expect(async () => { - await acknowledgedMessages.deleteMessageAsync( - queue, - message.getRequiredId(), - ); + await acknowledgedMessages.deleteMessageAsync(queue, messageId); }).rejects.toThrow(QueueMessageNotFoundError); }); diff --git a/tests/tests/deleting-messages/test00004.test.ts b/tests/tests/deleting-messages/test00004.test.ts index 1b3b339b..28de652f 100644 --- a/tests/tests/deleting-messages/test00004.test.ts +++ b/tests/tests/deleting-messages/test00004.test.ts @@ -20,7 +20,7 @@ import { QueueMessageNotFoundError } from '../../../src/lib/queue/errors'; test('Combined test: Delete a dead-letter message. Check pending, acknowledged, and dead-letter message. Check queue metrics.', async () => { await createQueue(defaultQueue, false); - const { queue, message } = await produceAndDeadLetterMessage(); + const { queue, messageId } = await produceAndDeadLetterMessage(); const deadLetteredMessages = await getQueueDeadLetteredMessages(); const res0 = await deadLetteredMessages.countMessagesAsync(queue); @@ -38,7 +38,7 @@ test('Combined test: Delete a dead-letter message. Check pending, acknowledged, const res3 = await deadLetteredMessages.getMessagesAsync(queue, 0, 100); expect(res3.totalItems).toBe(1); expect(res3.items.length).toBe(1); - expect(res3.items[0].getId()).toEqual(message.getRequiredId()); + expect(res3.items[0].getId()).toEqual(messageId); const queueMessages = await getQueueMessages(); const count = await queueMessages.countMessagesByStatusAsync(queue); @@ -46,7 +46,7 @@ test('Combined test: Delete a dead-letter message. Check pending, acknowledged, expect(count.acknowledged).toBe(0); expect(count.deadLettered).toBe(1); - await deadLetteredMessages.deleteMessageAsync(queue, message.getRequiredId()); + await deadLetteredMessages.deleteMessageAsync(queue, messageId); const res4 = await acknowledgedMessages.getMessagesAsync(queue, 0, 100); expect(res4.totalItems).toBe(0); @@ -66,9 +66,6 @@ test('Combined test: Delete a dead-letter message. Check pending, acknowledged, expect(count1.deadLettered).toBe(0); await expect(async () => { - await deadLetteredMessages.deleteMessageAsync( - queue, - message.getRequiredId(), - ); + await deadLetteredMessages.deleteMessageAsync(queue, messageId); }).rejects.toThrow(QueueMessageNotFoundError); }); diff --git a/tests/tests/event-listeners/test00001.test.ts b/tests/tests/event-listeners/test00001.test.ts index 7306989d..f7e371b8 100644 --- a/tests/tests/event-listeners/test00001.test.ts +++ b/tests/tests/event-listeners/test00001.test.ts @@ -8,10 +8,11 @@ */ import { - IRedisSMQConfig, + EConsumeMessageDeadLetterCause, IEventListener, - TEventListenerInitArgs, IQueueParams, + IRedisSMQConfig, + TEventListenerInitArgs, } from '../../../types'; import { ICallback } from 'redis-smq-common'; import { config } from '../../common/config'; @@ -30,7 +31,7 @@ import { Configuration } from '../../../src/config/configuration'; const consumerStats: Record< string, - { queue: IQueueParams; event: string; message: Message }[] + { queue: IQueueParams; event: string; messageId: string }[] > = {}; class TestConsumerEventListener implements IEventListener { @@ -39,20 +40,30 @@ class TestConsumerEventListener implements IEventListener { cb: ICallback, ) { consumerStats[instanceId] = []; - eventProvider.on(events.MESSAGE_ACKNOWLEDGED, (msg: Message) => { - consumerStats[instanceId].push({ - queue: msg.getDestinationQueue(), - event: events.MESSAGE_ACKNOWLEDGED, - message: msg, - }); - }); - eventProvider.on(events.MESSAGE_DEAD_LETTERED, (msg: Message) => { - consumerStats[instanceId].push({ - queue: msg.getDestinationQueue(), - event: events.MESSAGE_DEAD_LETTERED, - message: msg, - }); - }); + eventProvider.on( + events.MESSAGE_ACKNOWLEDGED, + (messageId: string, queue: IQueueParams) => { + consumerStats[instanceId].push({ + queue, + event: events.MESSAGE_ACKNOWLEDGED, + messageId, + }); + }, + ); + eventProvider.on( + events.MESSAGE_DEAD_LETTERED, + ( + _: EConsumeMessageDeadLetterCause, + messageId: string, + queue: IQueueParams, + ) => { + consumerStats[instanceId].push({ + queue, + event: events.MESSAGE_DEAD_LETTERED, + messageId, + }); + }, + ); cb(); } @@ -74,16 +85,16 @@ test('Consumer event listeners', async () => { Message.setDefaultConsumeOptions({ retryDelay: 0 }); await createQueue(defaultQueue, false); - const { message: m0, consumer: c0 } = + const { messageId: m0, consumer: c0 } = await produceAndAcknowledgeMessage(defaultQueue); await shutDownBaseInstance(c0); - const { message: m1, consumer: c1 } = + const { messageId: m1, consumer: c1 } = await produceAndAcknowledgeMessage(defaultQueue); await shutDownBaseInstance(c1); const anotherQueue = { name: 'another_queue', ns: 'testing' }; await createQueue(anotherQueue, false); const { - message: m2, + messageId: m2, consumer: c2, producer: p2, } = await produceAndDeadLetterMessage(anotherQueue); @@ -109,29 +120,29 @@ test('Consumer event listeners', async () => { expect(consumerStats[c0.getId()][0]).toEqual({ queue: defaultQueue, event: events.MESSAGE_ACKNOWLEDGED, - message: m0, + messageId: m0, }); expect(consumerStats[c1.getId()][0]).toEqual({ queue: defaultQueue, event: events.MESSAGE_ACKNOWLEDGED, - message: m1, + messageId: m1, }); expect(consumerStats[c2.getId()].length).toEqual(1); expect(consumerStats[c2.getId()][0].queue).toEqual(anotherQueue); expect(consumerStats[c2.getId()][0].event).toEqual( events.MESSAGE_DEAD_LETTERED, ); - expect(consumerStats[c2.getId()][0].message.getId()).toEqual(m2.getId()); + expect(consumerStats[c2.getId()][0].messageId).toEqual(m2); expect(consumerStats[c3.getId()].length).toEqual(2); expect(consumerStats[c3.getId()][0]).toEqual({ queue: anotherQueue, event: events.MESSAGE_ACKNOWLEDGED, - message: m3, + messageId: m3.getRequiredId(), }); expect(consumerStats[c3.getId()][1]).toEqual({ queue: anotherQueue, event: events.MESSAGE_ACKNOWLEDGED, - message: m4, + messageId: m4.getRequiredId(), }); }); diff --git a/tests/tests/event-listeners/test00002.test.ts b/tests/tests/event-listeners/test00002.test.ts index ec5e45b9..9748345d 100644 --- a/tests/tests/event-listeners/test00002.test.ts +++ b/tests/tests/event-listeners/test00002.test.ts @@ -23,16 +23,17 @@ import { } from '../../common/message-producing-consuming'; import { Configuration } from '../../../src/config/configuration'; -const producerStats: Record = {}; +const producerStats: Record = + {}; class TestProducerEventListener implements IEventListener { init(args: TEventListenerInitArgs, cb: ICallback) { const { instanceId, eventProvider } = args; producerStats[instanceId] = []; - eventProvider.on(events.MESSAGE_PUBLISHED, (msg: Message) => { + eventProvider.on(events.MESSAGE_PUBLISHED, (messageId: string) => { producerStats[instanceId].push({ event: events.MESSAGE_PUBLISHED, - message: msg, + messageId, }); }); cb(); @@ -71,19 +72,19 @@ test('Producer event listeners', async () => { expect(producerStats[p0.getId()].length).toEqual(2); expect(producerStats[p0.getId()][0]).toEqual({ event: events.MESSAGE_PUBLISHED, - message: m0, + messageId: m0.getRequiredId(), }); expect(producerStats[p0.getId()][1]).toEqual({ event: events.MESSAGE_PUBLISHED, - message: m1, + messageId: m1.getRequiredId(), }); expect(producerStats[p1.getId()].length).toEqual(2); expect(producerStats[p1.getId()][0]).toEqual({ event: events.MESSAGE_PUBLISHED, - message: m2, + messageId: m2.getRequiredId(), }); expect(producerStats[p1.getId()][1]).toEqual({ event: events.MESSAGE_PUBLISHED, - message: m3, + messageId: m3.getRequiredId(), }); }); diff --git a/tests/tests/exchanges/fanout-exchange/test00003.test.ts b/tests/tests/exchanges/fanout-exchange/test00003.test.ts index 3aa166b9..45f5ec22 100644 --- a/tests/tests/exchanges/fanout-exchange/test00003.test.ts +++ b/tests/tests/exchanges/fanout-exchange/test00003.test.ts @@ -13,6 +13,7 @@ import { isEqual } from '../../../common/util'; import { EQueueType } from '../../../../types'; import { getQueue } from '../../../common/queue'; import { getFanOutExchange } from '../../../common/exchange'; +import { getQueueMessages } from '../../../common/queue-messages'; test('ExchangeFanOut: producing message using setFanOut()', async () => { const q1 = { ns: 'testing', name: 'w123' }; @@ -33,9 +34,11 @@ test('ExchangeFanOut: producing message using setFanOut()', async () => { const r = await producer.produceAsync(msg); expect(r.scheduled).toEqual(false); + const messages = await getQueueMessages(); + const items = await messages.getMessagesByIdsAsync(r.messages); expect( isEqual( - r.messages.map((i) => i.getDestinationQueue()), + items.map((i) => i.getDestinationQueue()), [q1, q2], ), ).toBe(true); diff --git a/tests/tests/exchanges/fanout-exchange/test00004.test.ts b/tests/tests/exchanges/fanout-exchange/test00004.test.ts index 19f42096..9adfc1b4 100644 --- a/tests/tests/exchanges/fanout-exchange/test00004.test.ts +++ b/tests/tests/exchanges/fanout-exchange/test00004.test.ts @@ -14,6 +14,7 @@ import { isEqual } from '../../../common/util'; import { EQueueType } from '../../../../types'; import { getQueue } from '../../../common/queue'; import { getFanOutExchange } from '../../../common/exchange'; +import { getQueueMessages } from '../../../common/queue-messages'; test('ExchangeFanOut: producing message using setExchange()', async () => { const q1 = { ns: 'testing', name: 'w123' }; @@ -35,9 +36,11 @@ test('ExchangeFanOut: producing message using setExchange()', async () => { const r = await producer.produceAsync(msg); expect(r.scheduled).toEqual(false); + const messages = await getQueueMessages(); + const items = await messages.getMessagesByIdsAsync(r.messages); expect( isEqual( - r.messages.map((i) => i.getDestinationQueue()), + items.map((i) => i.getDestinationQueue()), [q1, q2], ), ).toBe(true); diff --git a/tests/tests/exchanges/topic-exchange/test00004.test.ts b/tests/tests/exchanges/topic-exchange/test00004.test.ts index b98e1361..10098af3 100644 --- a/tests/tests/exchanges/topic-exchange/test00004.test.ts +++ b/tests/tests/exchanges/topic-exchange/test00004.test.ts @@ -12,6 +12,7 @@ import { createQueue } from '../../../common/message-producing-consuming'; import { Message } from '../../../../src/lib/message/message'; import { getProducer } from '../../../common/producer'; import { isEqual } from '../../../common/util'; +import { getQueueMessages } from '../../../common/queue-messages'; test('ExchangeTopic: producing message using setExchange()', async () => { await createQueue({ ns: 'testing', name: 'w123.2.4.5' }, false); @@ -27,9 +28,11 @@ test('ExchangeTopic: producing message using setExchange()', async () => { const msg = new Message().setExchange(e1).setBody('hello'); const r = await producer.produceAsync(msg); expect(r.scheduled).toEqual(false); + const messages = await getQueueMessages(); + const items = await messages.getMessagesByIdsAsync(r.messages); expect( isEqual( - r.messages.map((i) => i.getDestinationQueue()), + items.map((i) => i.getDestinationQueue()), [ { ns: 'testing', name: 'w123.2.4.5.6' }, { ns: 'testing', name: 'w123.2.4.5' }, diff --git a/tests/tests/exchanges/topic-exchange/test00005.test.ts b/tests/tests/exchanges/topic-exchange/test00005.test.ts index f7947f05..4cffac77 100644 --- a/tests/tests/exchanges/topic-exchange/test00005.test.ts +++ b/tests/tests/exchanges/topic-exchange/test00005.test.ts @@ -11,6 +11,7 @@ import { createQueue } from '../../../common/message-producing-consuming'; import { Message } from '../../../../src/lib/message/message'; import { getProducer } from '../../../common/producer'; import { isEqual } from '../../../common/util'; +import { getQueueMessages } from '../../../common/queue-messages'; test('ExchangeTopic: producing message using setTopic()', async () => { await createQueue({ ns: 'testing', name: 'w123.2.4.5' }, false); @@ -25,9 +26,11 @@ test('ExchangeTopic: producing message using setTopic()', async () => { const msg = new Message().setTopic('w123.2.4').setBody('hello'); const r = await producer.produceAsync(msg); expect(r.scheduled).toEqual(false); + const messages = await getQueueMessages(); + const items = await messages.getMessagesByIdsAsync(r.messages); expect( isEqual( - r.messages.map((i) => i.getDestinationQueue()), + items.map((i) => i.getDestinationQueue()), [ { ns: 'testing', name: 'w123.2.4.5.6' }, { ns: 'testing', name: 'w123.2.4.5' }, diff --git a/tests/tests/queue-acknowledged-messages/test00001.test.ts b/tests/tests/queue-acknowledged-messages/test00001.test.ts index 3eff2a62..bd614350 100644 --- a/tests/tests/queue-acknowledged-messages/test00001.test.ts +++ b/tests/tests/queue-acknowledged-messages/test00001.test.ts @@ -39,7 +39,7 @@ test('Acknowledged message', async () => { messageHandler: (msg1, cb) => cb(), }); consumer.run(); - await untilMessageAcknowledged(consumer, msg); + await untilMessageAcknowledged(consumer, msg.getRequiredId()); const acknowledgedMessages = await getQueueAcknowledgedMessages(); const count = await acknowledgedMessages.countMessagesAsync(defaultQueue); diff --git a/tests/tests/queue-dead-lettered-messages/test00001.test.ts b/tests/tests/queue-dead-lettered-messages/test00001.test.ts index eb9c9a90..08d9e9e3 100644 --- a/tests/tests/queue-dead-lettered-messages/test00001.test.ts +++ b/tests/tests/queue-dead-lettered-messages/test00001.test.ts @@ -39,7 +39,7 @@ test('Queue dead-lettered message', async () => { messageHandler: (msg1, cb) => cb(new Error()), }); consumer.run(); - await untilMessageDeadLettered(consumer, msg); + await untilMessageDeadLettered(consumer, msg.getRequiredId()); const deadLetteredMessages = await getQueueDeadLetteredMessages(); const count = await deadLetteredMessages.countMessagesAsync(defaultQueue); diff --git a/tests/tests/queue-rate-limit/test00029.test.ts b/tests/tests/queue-rate-limit/test00029.test.ts index 65dc44c2..2d0fe9e6 100644 --- a/tests/tests/queue-rate-limit/test00029.test.ts +++ b/tests/tests/queue-rate-limit/test00029.test.ts @@ -50,11 +50,11 @@ test('Rate limit a queue without priority and check message rate', async () => { new Message().setBody('msg 6').setQueue(defaultQueue), ); - const messages: { ts: number; msg: Message }[] = []; + const messages: { ts: number; messageId: string }[] = []; const consumer = await getConsumer(); - consumer.on(events.MESSAGE_ACKNOWLEDGED, (msg: Message) => { - messages.push({ ts: Date.now(), msg }); + consumer.on(events.MESSAGE_ACKNOWLEDGED, (messageId: string) => { + messages.push({ ts: Date.now(), messageId }); }); await consumer.runAsync(); diff --git a/tests/tests/queue-rate-limit/test00030.test.ts b/tests/tests/queue-rate-limit/test00030.test.ts index 26283865..2f5952e5 100644 --- a/tests/tests/queue-rate-limit/test00030.test.ts +++ b/tests/tests/queue-rate-limit/test00030.test.ts @@ -68,13 +68,13 @@ test('Rate limit a priority queue and check message rate', async () => { .setPriority(Message.MessagePriority.HIGH), ); - const messages: { ts: number; msg: Message }[] = []; + const messages: { ts: number; messageId: string }[] = []; const consumer = await getConsumer(); await consumer.cancelAsync(defaultQueue); await consumer.consumeAsync(defaultQueue, (msg, cb) => cb()); - consumer.on(events.MESSAGE_ACKNOWLEDGED, (msg: Message) => { - messages.push({ ts: Date.now(), msg }); + consumer.on(events.MESSAGE_ACKNOWLEDGED, (messageId: string) => { + messages.push({ ts: Date.now(), messageId }); }); await consumer.runAsync(); diff --git a/tests/tests/queue-rate-limit/test00031.test.ts b/tests/tests/queue-rate-limit/test00031.test.ts index 4a30553d..a5798d20 100644 --- a/tests/tests/queue-rate-limit/test00031.test.ts +++ b/tests/tests/queue-rate-limit/test00031.test.ts @@ -28,11 +28,11 @@ test('Set a rate limit for a queue and consume message using many consumers', as interval: 10000, }); - const messages: { ts: number; msg: Message }[] = []; + const messages: { ts: number; messageId: string }[] = []; for (let i = 0; i < 6; i += 1) { const consumer = await getConsumer(); - consumer.on(events.MESSAGE_ACKNOWLEDGED, (msg: Message) => { - messages.push({ ts: Date.now(), msg }); + consumer.on(events.MESSAGE_ACKNOWLEDGED, (messageId: string) => { + messages.push({ ts: Date.now(), messageId }); }); await consumer.runAsync(); } diff --git a/tests/tests/requeuing-messages/test00001.test.ts b/tests/tests/requeuing-messages/test00001.test.ts index d4d3771e..91950ac7 100644 --- a/tests/tests/requeuing-messages/test00001.test.ts +++ b/tests/tests/requeuing-messages/test00001.test.ts @@ -19,20 +19,17 @@ import { getQueueMessages } from '../../common/queue-messages'; test('Combined test: Requeue a message from dead-letter queue. Check queue metrics.', async () => { await createQueue(defaultQueue, false); - const { message, queue, consumer } = await produceAndDeadLetterMessage(); + const { messageId, queue, consumer } = await produceAndDeadLetterMessage(); await shutDownBaseInstance(consumer); const deadLetteredMessages = await getQueueDeadLetteredMessages(); - await deadLetteredMessages.requeueMessageAsync( - queue, - message.getRequiredId(), - ); + await deadLetteredMessages.requeueMessageAsync(queue, messageId); const pendingMessages = await getQueuePendingMessages(); const res2 = await pendingMessages.getMessagesAsync(queue, 0, 100); expect(res2.totalItems).toBe(1); expect(res2.items.length).toBe(1); - expect(res2.items[0].getId()).toEqual(message.getRequiredId()); + expect(res2.items[0].getId()).toEqual(messageId); const res3 = await deadLetteredMessages.getMessagesAsync(queue, 0, 100); expect(res3.totalItems).toBe(0); @@ -44,9 +41,6 @@ test('Combined test: Requeue a message from dead-letter queue. Check queue metri expect(count.pending).toBe(1); await expect(async () => { - await deadLetteredMessages.requeueMessageAsync( - queue, - message.getRequiredId(), - ); + await deadLetteredMessages.requeueMessageAsync(queue, messageId); }).not.toThrow(); }); diff --git a/tests/tests/requeuing-messages/test00002.test.ts b/tests/tests/requeuing-messages/test00002.test.ts index 42224443..e90dfea9 100644 --- a/tests/tests/requeuing-messages/test00002.test.ts +++ b/tests/tests/requeuing-messages/test00002.test.ts @@ -19,20 +19,17 @@ import { getQueueAcknowledgedMessages } from '../../common/queue-acknowledged-me test('Combined test. Requeue a message from acknowledged queue. Check queue metrics.', async () => { await createQueue(defaultQueue, false); - const { message, queue, consumer } = await produceAndAcknowledgeMessage(); + const { messageId, queue, consumer } = await produceAndAcknowledgeMessage(); await shutDownBaseInstance(consumer); const acknowledgedMessages = await getQueueAcknowledgedMessages(); - await acknowledgedMessages.requeueMessageAsync( - queue, - message.getRequiredId(), - ); + await acknowledgedMessages.requeueMessageAsync(queue, messageId); const pendingMessages = await getQueuePendingMessages(); const res2 = await pendingMessages.getMessagesAsync(queue, 0, 100); expect(res2.totalItems).toBe(1); expect(res2.items.length).toBe(1); - expect(res2.items[0].getId()).toEqual(message.getRequiredId()); + expect(res2.items[0].getId()).toEqual(messageId); const res3 = await acknowledgedMessages.getMessagesAsync(queue, 0, 100); expect(res3.totalItems).toBe(0); @@ -44,9 +41,6 @@ test('Combined test. Requeue a message from acknowledged queue. Check queue metr expect(count.pending).toBe(1); await expect(async () => { - await acknowledgedMessages.requeueMessageAsync( - queue, - message.getRequiredId(), - ); + await acknowledgedMessages.requeueMessageAsync(queue, messageId); }).not.toThrow(); });