Skip to content

Commit

Permalink
fix(message): correct logic for deleting multiple messages by IDs
Browse files Browse the repository at this point in the history
  • Loading branch information
weyoss committed Apr 29, 2024
1 parent ff9ff3d commit fdb39d6
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 24 deletions.
43 changes: 19 additions & 24 deletions src/lib/message/_/_delete-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,24 @@ export function _deleteMessage(
const keys: string[] = [];
const argv: (string | number)[] = [];
const ids = typeof messageId === 'string' ? [messageId] : messageId;
const { keyScheduledMessages, keyDelayedMessages, keyRequeueMessages } =
redisKeys.getMainKeys();
keys.push(keyScheduledMessages, keyDelayedMessages, keyRequeueMessages);
argv.push(
EQueueProperty.QUEUE_TYPE,
EQueueProperty.MESSAGES_COUNT,
EQueueType.PRIORITY_QUEUE,
EQueueType.LIFO_QUEUE,
EQueueType.FIFO_QUEUE,
EMessageProperty.STATUS,
EMessagePropertyStatus.PROCESSING,
EMessagePropertyStatus.ACKNOWLEDGED,
EMessagePropertyStatus.PENDING,
EMessagePropertyStatus.SCHEDULED,
EMessagePropertyStatus.DEAD_LETTERED,
EMessagePropertyStatus.UNACK_DELAYING,
EMessagePropertyStatus.UNACK_REQUEUING,
);
async.each(
ids,
(id, _, done) => {
Expand All @@ -46,16 +64,8 @@ export function _deleteMessage(
message.getDestinationQueue(),
message.getConsumerGroupId(),
);
const {
keyScheduledMessages,
keyDelayedMessages,
keyRequeueMessages,
} = redisKeys.getMainKeys();
const { keyMessage } = redisKeys.getMessageKeys(id);
keys.push(
keyScheduledMessages,
keyDelayedMessages,
keyRequeueMessages,
keyMessage,
keyQueueProperties,
keyQueuePending,
Expand All @@ -64,22 +74,7 @@ export function _deleteMessage(
keyQueueScheduled,
keyQueuePriorityPending,
);
argv.push(
EQueueProperty.QUEUE_TYPE,
EQueueProperty.MESSAGES_COUNT,
EQueueType.PRIORITY_QUEUE,
EQueueType.LIFO_QUEUE,
EQueueType.FIFO_QUEUE,
EMessageProperty.STATUS,
EMessagePropertyStatus.PROCESSING,
EMessagePropertyStatus.ACKNOWLEDGED,
EMessagePropertyStatus.PENDING,
EMessagePropertyStatus.SCHEDULED,
EMessagePropertyStatus.DEAD_LETTERED,
EMessagePropertyStatus.UNACK_DELAYING,
EMessagePropertyStatus.UNACK_REQUEUING,
id,
);
argv.push(id);
done();
}
});
Expand Down
14 changes: 14 additions & 0 deletions tests/common/message-producing-consuming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export const defaultQueue: IQueueParams = {

export async function produceAndAcknowledgeMessage(
queue: IQueueParams = defaultQueue,
autoShutdown = false,
) {
const producer = getProducer();
await producer.runAsync();
Expand All @@ -49,11 +50,18 @@ export async function produceAndAcknowledgeMessage(

consumer.run(() => void 0);
await untilMessageAcknowledged(consumer);

if (autoShutdown) {
await producer.shutdownAsync();
await consumer.shutdownAsync();
}

return { producer, consumer, queue, messageId };
}

export async function produceAndDeadLetterMessage(
queue: IQueueParams = defaultQueue,
autoShutdown = false,
) {
const producer = getProducer();
await producer.runAsync();
Expand All @@ -71,6 +79,12 @@ export async function produceAndDeadLetterMessage(

consumer.run(() => void 0);
await untilMessageDeadLettered(consumer);

if (autoShutdown) {
await producer.shutdownAsync();
await consumer.shutdownAsync();
}

return { producer, consumer, messageId, queue };
}

Expand Down
44 changes: 44 additions & 0 deletions tests/tests/deleting-messages/test00006.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c)
* Weyoss <weyoss@protonmail.com>
* https://github.com/weyoss
*
* This source code is licensed under the MIT license found in the LICENSE file
* in the root directory of this source tree.
*/

import { test, expect } from '@jest/globals';
import { getMessage } from '../../common/message.js';
import {
createQueue,
defaultQueue,
produceMessage,
} from '../../common/message-producing-consuming.js';
import { getQueuePendingMessages } from '../../common/queue-pending-messages.js';

test('Combined test: Delete pending messages by IDs. Check pending messages. Check queue metrics.', async () => {
await createQueue(defaultQueue, false);
const { messageId: msg1 } = await produceMessage();
const { messageId: msg2 } = await produceMessage();
const ids = [msg1, msg2].sort((a, b) => (a > b ? 1 : -1));

const pendingMessages = await getQueuePendingMessages();

const res1 = await pendingMessages.getMessagesAsync(defaultQueue, 0, 100);
expect(res1.totalItems).toBe(2);
const items = res1.items.map((i) => i.id).sort((a, b) => (a > b ? 1 : -1));
expect(items).toEqual(ids);

const count = await pendingMessages.countMessagesAsync(defaultQueue);
expect(count).toBe(2);

const message = await getMessage();
await message.deleteMessagesByIdsAsync([msg1, msg2]);

const res2 = await pendingMessages.getMessagesAsync(defaultQueue, 0, 100);
expect(res2.totalItems).toBe(0);
expect(res2.items.length).toBe(0);

const count2 = await pendingMessages.countMessagesAsync(defaultQueue);
expect(count2).toBe(0);
});
58 changes: 58 additions & 0 deletions tests/tests/deleting-messages/test00007.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c)
* Weyoss <weyoss@protonmail.com>
* https://github.com/weyoss
*
* This source code is licensed under the MIT license found in the LICENSE file
* in the root directory of this source tree.
*/

import { test, expect } from '@jest/globals';
import { getMessage } from '../../common/message.js';
import {
createQueue,
defaultQueue,
produceAndAcknowledgeMessage,
} from '../../common/message-producing-consuming.js';
import { getQueueAcknowledgedMessages } from '../../common/queue-acknowledged-messages.js';

test('Combined test: Delete acknowledged messages by IDs. Check acknowledged messages. Check queue metrics.', async () => {
await createQueue(defaultQueue, false);
const { messageId: msg1 } = await produceAndAcknowledgeMessage(
defaultQueue,
true,
);
const { messageId: msg2 } = await produceAndAcknowledgeMessage(
defaultQueue,
true,
);
const ids = [msg1, msg2].sort((a, b) => (a > b ? 1 : -1));

const acknowledgedMessages = await getQueueAcknowledgedMessages();

const res1 = await acknowledgedMessages.getMessagesAsync(
defaultQueue,
0,
100,
);
expect(res1.totalItems).toBe(2);
const items = res1.items.map((i) => i.id).sort((a, b) => (a > b ? 1 : -1));
expect(items).toEqual(ids);

const count = await acknowledgedMessages.countMessagesAsync(defaultQueue);
expect(count).toBe(2);

const message = await getMessage();
await message.deleteMessagesByIdsAsync([msg1, msg2]);

const res2 = await acknowledgedMessages.getMessagesAsync(
defaultQueue,
0,
100,
);
expect(res2.totalItems).toBe(0);
expect(res2.items.length).toBe(0);

const count2 = await acknowledgedMessages.countMessagesAsync(defaultQueue);
expect(count2).toBe(0);
});
58 changes: 58 additions & 0 deletions tests/tests/deleting-messages/test00008.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c)
* Weyoss <weyoss@protonmail.com>
* https://github.com/weyoss
*
* This source code is licensed under the MIT license found in the LICENSE file
* in the root directory of this source tree.
*/

import { test, expect } from '@jest/globals';
import { getMessage } from '../../common/message.js';
import {
createQueue,
defaultQueue,
produceAndDeadLetterMessage,
} from '../../common/message-producing-consuming.js';
import { getQueueDeadLetteredMessages } from '../../common/queue-dead-lettered-messages.js';

test('Combined test: Delete dead-lettered messages by IDs. Check dead-lettered messages. Check queue metrics.', async () => {
await createQueue(defaultQueue, false);
const { messageId: msg1 } = await produceAndDeadLetterMessage(
defaultQueue,
true,
);
const { messageId: msg2 } = await produceAndDeadLetterMessage(
defaultQueue,
true,
);
const ids = [msg1, msg2].sort((a, b) => (a > b ? 1 : -1));

const deadLetteredMessages = await getQueueDeadLetteredMessages();

const res1 = await deadLetteredMessages.getMessagesAsync(
defaultQueue,
0,
100,
);
expect(res1.totalItems).toBe(2);
const items = res1.items.map((i) => i.id).sort((a, b) => (a > b ? 1 : -1));
expect(items).toEqual(ids);

const count = await deadLetteredMessages.countMessagesAsync(defaultQueue);
expect(count).toBe(2);

const message = await getMessage();
await message.deleteMessagesByIdsAsync([msg1, msg2]);

const res2 = await deadLetteredMessages.getMessagesAsync(
defaultQueue,
0,
100,
);
expect(res2.totalItems).toBe(0);
expect(res2.items.length).toBe(0);

const count2 = await deadLetteredMessages.countMessagesAsync(defaultQueue);
expect(count2).toBe(0);
});
45 changes: 45 additions & 0 deletions tests/tests/deleting-messages/test00009.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c)
* Weyoss <weyoss@protonmail.com>
* https://github.com/weyoss
*
* This source code is licensed under the MIT license found in the LICENSE file
* in the root directory of this source tree.
*/

import { test, expect } from '@jest/globals';
import { getMessage } from '../../common/message.js';
import {
createQueue,
defaultQueue,
scheduleMessage,
} from '../../common/message-producing-consuming.js';
import { getQueueScheduledMessages } from '../../common/queue-scheduled-messages.js';

test('Combined test: Delete scheduled messages by IDs. Check scheduled messages. Check queue metrics.', async () => {
await createQueue(defaultQueue, false);

const { messageId: msg1 } = await scheduleMessage();
const { messageId: msg2 } = await scheduleMessage();
const ids = [msg1, msg2].sort((a, b) => (a > b ? 1 : -1));

const scheduledMessages = await getQueueScheduledMessages();

const res1 = await scheduledMessages.getMessagesAsync(defaultQueue, 0, 100);
expect(res1.totalItems).toBe(2);
const items = res1.items.map((i) => i.id).sort((a, b) => (a > b ? 1 : -1));
expect(items).toEqual(ids);

const count = await scheduledMessages.countMessagesAsync(defaultQueue);
expect(count).toBe(2);

const message = await getMessage();
await message.deleteMessagesByIdsAsync([msg1, msg2]);

const res2 = await scheduledMessages.getMessagesAsync(defaultQueue, 0, 100);
expect(res2.totalItems).toBe(0);
expect(res2.items.length).toBe(0);

const count2 = await scheduledMessages.countMessagesAsync(defaultQueue);
expect(count2).toBe(0);
});
44 changes: 44 additions & 0 deletions tests/tests/deleting-messages/test00010.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c)
* Weyoss <weyoss@protonmail.com>
* https://github.com/weyoss
*
* This source code is licensed under the MIT license found in the LICENSE file
* in the root directory of this source tree.
*/

import { expect, test } from '@jest/globals';
import { EQueueType } from '../../../src/lib/queue/types/queue.js';
import {
createQueue,
defaultQueue,
produceMessageWithPriority,
} from '../../common/message-producing-consuming.js';
import { getMessage } from '../../common/message.js';
import { getQueuePendingMessages } from '../../common/queue-pending-messages.js';

test('Combined test: Delete pending priority messages by IDs. Check pending priority messages. Check queue metrics.', async () => {
await createQueue(defaultQueue, EQueueType.PRIORITY_QUEUE);
const { messageId: msg1 } = await produceMessageWithPriority();
const { messageId: msg2 } = await produceMessageWithPriority();
const ids = [msg1, msg2].sort((a, b) => (a > b ? 1 : -1));

const pendingMessages = await getQueuePendingMessages();

const res1 = await pendingMessages.getMessagesAsync(defaultQueue, 0, 100);
expect(res1.totalItems).toBe(2);
const items = res1.items.map((i) => i.id).sort((a, b) => (a > b ? 1 : -1));
expect(items).toEqual(ids);
const count = await pendingMessages.countMessagesAsync(defaultQueue);
expect(count).toBe(2);

const message = await getMessage();
await message.deleteMessagesByIdsAsync([msg1, msg2]);

const res2 = await pendingMessages.getMessagesAsync(defaultQueue, 0, 100);
expect(res2.totalItems).toBe(0);
expect(res2.items.length).toBe(0);

const count2 = await pendingMessages.countMessagesAsync(defaultQueue);
expect(count2).toBe(0);
});

0 comments on commit fdb39d6

Please sign in to comment.