From fdb39d66f3cea59decf8ad29b5efdf827780ba73 Mon Sep 17 00:00:00 2001 From: Weyoss Date: Mon, 29 Apr 2024 12:04:12 +0200 Subject: [PATCH] fix(message): correct logic for deleting multiple messages by IDs --- src/lib/message/_/_delete-message.ts | 43 ++++++-------- tests/common/message-producing-consuming.ts | 14 +++++ .../tests/deleting-messages/test00006.test.ts | 44 ++++++++++++++ .../tests/deleting-messages/test00007.test.ts | 58 +++++++++++++++++++ .../tests/deleting-messages/test00008.test.ts | 58 +++++++++++++++++++ .../tests/deleting-messages/test00009.test.ts | 45 ++++++++++++++ .../tests/deleting-messages/test00010.test.ts | 44 ++++++++++++++ 7 files changed, 282 insertions(+), 24 deletions(-) create mode 100644 tests/tests/deleting-messages/test00006.test.ts create mode 100644 tests/tests/deleting-messages/test00007.test.ts create mode 100644 tests/tests/deleting-messages/test00008.test.ts create mode 100644 tests/tests/deleting-messages/test00009.test.ts create mode 100644 tests/tests/deleting-messages/test00010.test.ts diff --git a/src/lib/message/_/_delete-message.ts b/src/lib/message/_/_delete-message.ts index 35a1a476..2b4bd22d 100644 --- a/src/lib/message/_/_delete-message.ts +++ b/src/lib/message/_/_delete-message.ts @@ -28,6 +28,24 @@ export function _deleteMessage( const keys: string[] = []; const argv: (string | number)[] = []; const ids = typeof messageId === 'string' ? [messageId] : messageId; + const { keyScheduledMessages, keyDelayedMessages, keyRequeueMessages } = + redisKeys.getMainKeys(); + keys.push(keyScheduledMessages, keyDelayedMessages, keyRequeueMessages); + argv.push( + EQueueProperty.QUEUE_TYPE, + EQueueProperty.MESSAGES_COUNT, + EQueueType.PRIORITY_QUEUE, + EQueueType.LIFO_QUEUE, + EQueueType.FIFO_QUEUE, + EMessageProperty.STATUS, + EMessagePropertyStatus.PROCESSING, + EMessagePropertyStatus.ACKNOWLEDGED, + EMessagePropertyStatus.PENDING, + EMessagePropertyStatus.SCHEDULED, + EMessagePropertyStatus.DEAD_LETTERED, + EMessagePropertyStatus.UNACK_DELAYING, + EMessagePropertyStatus.UNACK_REQUEUING, + ); async.each( ids, (id, _, done) => { @@ -46,16 +64,8 @@ export function _deleteMessage( message.getDestinationQueue(), message.getConsumerGroupId(), ); - const { - keyScheduledMessages, - keyDelayedMessages, - keyRequeueMessages, - } = redisKeys.getMainKeys(); const { keyMessage } = redisKeys.getMessageKeys(id); keys.push( - keyScheduledMessages, - keyDelayedMessages, - keyRequeueMessages, keyMessage, keyQueueProperties, keyQueuePending, @@ -64,22 +74,7 @@ export function _deleteMessage( keyQueueScheduled, keyQueuePriorityPending, ); - argv.push( - EQueueProperty.QUEUE_TYPE, - EQueueProperty.MESSAGES_COUNT, - EQueueType.PRIORITY_QUEUE, - EQueueType.LIFO_QUEUE, - EQueueType.FIFO_QUEUE, - EMessageProperty.STATUS, - EMessagePropertyStatus.PROCESSING, - EMessagePropertyStatus.ACKNOWLEDGED, - EMessagePropertyStatus.PENDING, - EMessagePropertyStatus.SCHEDULED, - EMessagePropertyStatus.DEAD_LETTERED, - EMessagePropertyStatus.UNACK_DELAYING, - EMessagePropertyStatus.UNACK_REQUEUING, - id, - ); + argv.push(id); done(); } }); diff --git a/tests/common/message-producing-consuming.ts b/tests/common/message-producing-consuming.ts index bb6234ea..bd7f54f5 100644 --- a/tests/common/message-producing-consuming.ts +++ b/tests/common/message-producing-consuming.ts @@ -34,6 +34,7 @@ export const defaultQueue: IQueueParams = { export async function produceAndAcknowledgeMessage( queue: IQueueParams = defaultQueue, + autoShutdown = false, ) { const producer = getProducer(); await producer.runAsync(); @@ -49,11 +50,18 @@ export async function produceAndAcknowledgeMessage( consumer.run(() => void 0); await untilMessageAcknowledged(consumer); + + if (autoShutdown) { + await producer.shutdownAsync(); + await consumer.shutdownAsync(); + } + return { producer, consumer, queue, messageId }; } export async function produceAndDeadLetterMessage( queue: IQueueParams = defaultQueue, + autoShutdown = false, ) { const producer = getProducer(); await producer.runAsync(); @@ -71,6 +79,12 @@ export async function produceAndDeadLetterMessage( consumer.run(() => void 0); await untilMessageDeadLettered(consumer); + + if (autoShutdown) { + await producer.shutdownAsync(); + await consumer.shutdownAsync(); + } + return { producer, consumer, messageId, queue }; } diff --git a/tests/tests/deleting-messages/test00006.test.ts b/tests/tests/deleting-messages/test00006.test.ts new file mode 100644 index 00000000..3ae8eaba --- /dev/null +++ b/tests/tests/deleting-messages/test00006.test.ts @@ -0,0 +1,44 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { test, expect } from '@jest/globals'; +import { getMessage } from '../../common/message.js'; +import { + createQueue, + defaultQueue, + produceMessage, +} from '../../common/message-producing-consuming.js'; +import { getQueuePendingMessages } from '../../common/queue-pending-messages.js'; + +test('Combined test: Delete pending messages by IDs. Check pending messages. Check queue metrics.', async () => { + await createQueue(defaultQueue, false); + const { messageId: msg1 } = await produceMessage(); + const { messageId: msg2 } = await produceMessage(); + const ids = [msg1, msg2].sort((a, b) => (a > b ? 1 : -1)); + + const pendingMessages = await getQueuePendingMessages(); + + const res1 = await pendingMessages.getMessagesAsync(defaultQueue, 0, 100); + expect(res1.totalItems).toBe(2); + const items = res1.items.map((i) => i.id).sort((a, b) => (a > b ? 1 : -1)); + expect(items).toEqual(ids); + + const count = await pendingMessages.countMessagesAsync(defaultQueue); + expect(count).toBe(2); + + const message = await getMessage(); + await message.deleteMessagesByIdsAsync([msg1, msg2]); + + const res2 = await pendingMessages.getMessagesAsync(defaultQueue, 0, 100); + expect(res2.totalItems).toBe(0); + expect(res2.items.length).toBe(0); + + const count2 = await pendingMessages.countMessagesAsync(defaultQueue); + expect(count2).toBe(0); +}); diff --git a/tests/tests/deleting-messages/test00007.test.ts b/tests/tests/deleting-messages/test00007.test.ts new file mode 100644 index 00000000..6a023fe5 --- /dev/null +++ b/tests/tests/deleting-messages/test00007.test.ts @@ -0,0 +1,58 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { test, expect } from '@jest/globals'; +import { getMessage } from '../../common/message.js'; +import { + createQueue, + defaultQueue, + produceAndAcknowledgeMessage, +} from '../../common/message-producing-consuming.js'; +import { getQueueAcknowledgedMessages } from '../../common/queue-acknowledged-messages.js'; + +test('Combined test: Delete acknowledged messages by IDs. Check acknowledged messages. Check queue metrics.', async () => { + await createQueue(defaultQueue, false); + const { messageId: msg1 } = await produceAndAcknowledgeMessage( + defaultQueue, + true, + ); + const { messageId: msg2 } = await produceAndAcknowledgeMessage( + defaultQueue, + true, + ); + const ids = [msg1, msg2].sort((a, b) => (a > b ? 1 : -1)); + + const acknowledgedMessages = await getQueueAcknowledgedMessages(); + + const res1 = await acknowledgedMessages.getMessagesAsync( + defaultQueue, + 0, + 100, + ); + expect(res1.totalItems).toBe(2); + const items = res1.items.map((i) => i.id).sort((a, b) => (a > b ? 1 : -1)); + expect(items).toEqual(ids); + + const count = await acknowledgedMessages.countMessagesAsync(defaultQueue); + expect(count).toBe(2); + + const message = await getMessage(); + await message.deleteMessagesByIdsAsync([msg1, msg2]); + + const res2 = await acknowledgedMessages.getMessagesAsync( + defaultQueue, + 0, + 100, + ); + expect(res2.totalItems).toBe(0); + expect(res2.items.length).toBe(0); + + const count2 = await acknowledgedMessages.countMessagesAsync(defaultQueue); + expect(count2).toBe(0); +}); diff --git a/tests/tests/deleting-messages/test00008.test.ts b/tests/tests/deleting-messages/test00008.test.ts new file mode 100644 index 00000000..5ef31965 --- /dev/null +++ b/tests/tests/deleting-messages/test00008.test.ts @@ -0,0 +1,58 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { test, expect } from '@jest/globals'; +import { getMessage } from '../../common/message.js'; +import { + createQueue, + defaultQueue, + produceAndDeadLetterMessage, +} from '../../common/message-producing-consuming.js'; +import { getQueueDeadLetteredMessages } from '../../common/queue-dead-lettered-messages.js'; + +test('Combined test: Delete dead-lettered messages by IDs. Check dead-lettered messages. Check queue metrics.', async () => { + await createQueue(defaultQueue, false); + const { messageId: msg1 } = await produceAndDeadLetterMessage( + defaultQueue, + true, + ); + const { messageId: msg2 } = await produceAndDeadLetterMessage( + defaultQueue, + true, + ); + const ids = [msg1, msg2].sort((a, b) => (a > b ? 1 : -1)); + + const deadLetteredMessages = await getQueueDeadLetteredMessages(); + + const res1 = await deadLetteredMessages.getMessagesAsync( + defaultQueue, + 0, + 100, + ); + expect(res1.totalItems).toBe(2); + const items = res1.items.map((i) => i.id).sort((a, b) => (a > b ? 1 : -1)); + expect(items).toEqual(ids); + + const count = await deadLetteredMessages.countMessagesAsync(defaultQueue); + expect(count).toBe(2); + + const message = await getMessage(); + await message.deleteMessagesByIdsAsync([msg1, msg2]); + + const res2 = await deadLetteredMessages.getMessagesAsync( + defaultQueue, + 0, + 100, + ); + expect(res2.totalItems).toBe(0); + expect(res2.items.length).toBe(0); + + const count2 = await deadLetteredMessages.countMessagesAsync(defaultQueue); + expect(count2).toBe(0); +}); diff --git a/tests/tests/deleting-messages/test00009.test.ts b/tests/tests/deleting-messages/test00009.test.ts new file mode 100644 index 00000000..64b2203b --- /dev/null +++ b/tests/tests/deleting-messages/test00009.test.ts @@ -0,0 +1,45 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { test, expect } from '@jest/globals'; +import { getMessage } from '../../common/message.js'; +import { + createQueue, + defaultQueue, + scheduleMessage, +} from '../../common/message-producing-consuming.js'; +import { getQueueScheduledMessages } from '../../common/queue-scheduled-messages.js'; + +test('Combined test: Delete scheduled messages by IDs. Check scheduled messages. Check queue metrics.', async () => { + await createQueue(defaultQueue, false); + + const { messageId: msg1 } = await scheduleMessage(); + const { messageId: msg2 } = await scheduleMessage(); + const ids = [msg1, msg2].sort((a, b) => (a > b ? 1 : -1)); + + const scheduledMessages = await getQueueScheduledMessages(); + + const res1 = await scheduledMessages.getMessagesAsync(defaultQueue, 0, 100); + expect(res1.totalItems).toBe(2); + const items = res1.items.map((i) => i.id).sort((a, b) => (a > b ? 1 : -1)); + expect(items).toEqual(ids); + + const count = await scheduledMessages.countMessagesAsync(defaultQueue); + expect(count).toBe(2); + + const message = await getMessage(); + await message.deleteMessagesByIdsAsync([msg1, msg2]); + + const res2 = await scheduledMessages.getMessagesAsync(defaultQueue, 0, 100); + expect(res2.totalItems).toBe(0); + expect(res2.items.length).toBe(0); + + const count2 = await scheduledMessages.countMessagesAsync(defaultQueue); + expect(count2).toBe(0); +}); diff --git a/tests/tests/deleting-messages/test00010.test.ts b/tests/tests/deleting-messages/test00010.test.ts new file mode 100644 index 00000000..c870139c --- /dev/null +++ b/tests/tests/deleting-messages/test00010.test.ts @@ -0,0 +1,44 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { expect, test } from '@jest/globals'; +import { EQueueType } from '../../../src/lib/queue/types/queue.js'; +import { + createQueue, + defaultQueue, + produceMessageWithPriority, +} from '../../common/message-producing-consuming.js'; +import { getMessage } from '../../common/message.js'; +import { getQueuePendingMessages } from '../../common/queue-pending-messages.js'; + +test('Combined test: Delete pending priority messages by IDs. Check pending priority messages. Check queue metrics.', async () => { + await createQueue(defaultQueue, EQueueType.PRIORITY_QUEUE); + const { messageId: msg1 } = await produceMessageWithPriority(); + const { messageId: msg2 } = await produceMessageWithPriority(); + const ids = [msg1, msg2].sort((a, b) => (a > b ? 1 : -1)); + + const pendingMessages = await getQueuePendingMessages(); + + const res1 = await pendingMessages.getMessagesAsync(defaultQueue, 0, 100); + expect(res1.totalItems).toBe(2); + const items = res1.items.map((i) => i.id).sort((a, b) => (a > b ? 1 : -1)); + expect(items).toEqual(ids); + const count = await pendingMessages.countMessagesAsync(defaultQueue); + expect(count).toBe(2); + + const message = await getMessage(); + await message.deleteMessagesByIdsAsync([msg1, msg2]); + + const res2 = await pendingMessages.getMessagesAsync(defaultQueue, 0, 100); + expect(res2.totalItems).toBe(0); + expect(res2.items.length).toBe(0); + + const count2 = await pendingMessages.countMessagesAsync(defaultQueue); + expect(count2).toBe(0); +});