Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
weyoss committed Aug 2, 2022
1 parent e89facd commit 573e340
Show file tree
Hide file tree
Showing 42 changed files with 101 additions and 22 deletions.
15 changes: 9 additions & 6 deletions tests/common/consumer-thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import { Producer } from '../../src/lib/producer/producer';
import { Message } from '../../src/lib/message/message';

const producer = new Producer(config);
producer.produce(
new Message().setQueue(defaultQueue).setBody(123).setRetryDelay(0),
(err) => {
if (err) throw err;
},
);
producer.run((err) => {
if (err) throw err;
producer.produce(
new Message().setQueue(defaultQueue).setBody(123).setRetryDelay(0),
(err) => {
if (err) throw err;
},
);
});

const consumer = new Consumer(config);
consumer.consume(
Expand Down
15 changes: 12 additions & 3 deletions tests/common/message-producing-consuming.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { IConfig, TQueueParams } from '../../types';
import { Message } from '../../src/lib/message/message';
import { events } from '../../src/common/events/events';
import { promisifyAll } from 'bluebird';
import { untilConsumerEvent, untilMessageAcknowledged } from './events';
import { getConsumer } from './consumer';
import { getProducer } from './producer';
Expand All @@ -20,6 +19,8 @@ export async function produceAndAcknowledgeMessage(
cfg: IConfig = requiredConfig,
) {
const producer = getProducer(cfg);
await producer.runAsync();

const consumer = getConsumer({
cfg,
queue,
Expand All @@ -42,6 +43,8 @@ export async function produceAndDeadLetterMessage(
cfg: IConfig = requiredConfig,
) {
const producer = getProducer(cfg);
await producer.runAsync();

const consumer = getConsumer({
cfg,
queue,
Expand All @@ -64,6 +67,8 @@ export async function produceMessage(
cfg: IConfig = requiredConfig,
) {
const producer = getProducer(cfg);
await producer.runAsync();

const message = new Message();
message.setBody({ hello: 'world' }).setQueue(queue);
await producer.produceAsync(message);
Expand All @@ -74,7 +79,9 @@ export async function produceMessageWithPriority(
queue: TQueueParams = defaultQueue,
cfg: IConfig = requiredConfig,
) {
const producer = promisifyAll(getProducer(cfg));
const producer = getProducer(cfg);
await producer.runAsync();

const message = new Message();
message.setPriority(Message.MessagePriority.LOW).setQueue(queue);
await producer.produceAsync(message);
Expand All @@ -85,7 +92,9 @@ export async function scheduleMessage(
queue: TQueueParams = defaultQueue,
cfg: IConfig = requiredConfig,
) {
const producer = promisifyAll(getProducer(cfg));
const producer = getProducer(cfg);
await producer.runAsync();

const message = new Message();
message.setScheduledDelay(10000).setQueue(queue);
await producer.produceAsync(message);
Expand Down
2 changes: 2 additions & 0 deletions tests/tests/consuming-messages/test00002.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {
test('Produce and consume 1 message', async () => {
await createQueue(defaultQueue, false);
const producer = getProducer();
await producer.runAsync();

const consumer = getConsumer({
messageHandler: (msg1, cb) => cb(),
});
Expand Down
1 change: 1 addition & 0 deletions tests/tests/consuming-messages/test00003.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ test('Produce and consume 100 messages', async () => {
await createQueue(defaultQueue, false);

const producer = getProducer();
await producer.runAsync();

const total = 100;
const publishedMsg: Message[] = [];
Expand Down
2 changes: 2 additions & 0 deletions tests/tests/consuming-messages/test00004.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ test('A message is dead-lettered and not delivered when messageTTL is exceeded',
await createQueue(defaultQueue, false);

const producer = getProducer();
await producer.runAsync();

const consumer = getConsumer();
const consume = jest.spyOn(consumer, 'consume');

Expand Down
2 changes: 2 additions & 0 deletions tests/tests/consuming-messages/test00005.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ test('Setting default message TTL from configuration', async () => {
await createQueue(defaultQueue, false);

const producer = getProducer();
await producer.runAsync();

const consumer = getConsumer();
const consume = jest.spyOn(consumer, 'consume');

Expand Down
2 changes: 2 additions & 0 deletions tests/tests/consuming-messages/test00006.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ test('A message is unacknowledged when messageConsumeTimeout is exceeded', async
await createQueue(defaultQueue, false);

const producer = getProducer();
await producer.runAsync();

let consumeCount = 0;
const consumer = getConsumer({
messageHandler: jest.fn((msg: unknown, cb: ICallback<void>) => {
Expand Down
2 changes: 2 additions & 0 deletions tests/tests/consuming-messages/test00007.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {

test('Unacknowledged messages are re-queued when messageRetryThreshold is not exceeded', async () => {
const producer = getProducer();
await producer.runAsync();

await createQueue(defaultQueue, false);

let callCount = 0;
Expand Down
2 changes: 2 additions & 0 deletions tests/tests/consuming-messages/test00008.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import {

test('Async exceptions are caught when consuming a message', async () => {
const producer = getProducer();
await producer.runAsync();

await createQueue(defaultQueue, false);

let callCount = 0;
Expand Down
2 changes: 2 additions & 0 deletions tests/tests/consuming-messages/test00009.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import {
test('A message is dead-lettered when messageRetryThreshold is exceeded', async () => {
await createQueue(defaultQueue, false);
const producer = getProducer();
await producer.runAsync();

const consumer = getConsumer({
messageHandler: jest.fn(() => {
throw new Error('Explicit error');
Expand Down
2 changes: 2 additions & 0 deletions tests/tests/consuming-messages/test00011.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ test('Given many consumers, a message is delivered only to one consumer', async
msg.setBody({ hello: 'world' }).setQueue(defaultQueue);

const producer = getProducer();
await producer.runAsync();

await producer.produceAsync(msg);

/**
Expand Down
2 changes: 2 additions & 0 deletions tests/tests/consuming-messages/test00012.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ test('An unacknowledged message is delayed given messageRetryDelay > 0 and messa
.setRetryThreshold(5);

const producer = getProducer();
await producer.runAsync();

await producer.produceAsync(msg);
consumer.run();

Expand Down
1 change: 1 addition & 0 deletions tests/tests/consuming-messages/test00013.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ test('Given many queues, a message is recovered from a consumer crash and re-que
await queueBConsumer.runAsync();

const producer = getProducer();
await producer.runAsync();

// Produce a message to QUEUE B
const anotherMsg = new Message();
Expand Down
1 change: 1 addition & 0 deletions tests/tests/consuming-messages/test00014.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { createQueue } from '../../common/message-producing-consuming';

test('Consume messages from different queues and published by a single producer instance', async () => {
const producer = getProducer();
await producer.runAsync();
for (let i = 0; i < 5; i += 1) {
const queue = `QuEue_${i}`;
await createQueue(queue, false);
Expand Down
2 changes: 2 additions & 0 deletions tests/tests/consuming-messages/test00016.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ test('Consume messages from different queues using a single consumer instance: c
await consumer.runAsync();

const producer = getProducer();
await producer.runAsync();

const msg1 = new Message().setQueue('test_queue').setBody('some data');
setTimeout(() => {
producer.produceAsync(msg1);
Expand Down
2 changes: 2 additions & 0 deletions tests/tests/consuming-messages/test00019.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ test('An unacknowledged message is dead-lettered and not delivered again, given
await createQueue(defaultQueue, false);

const producer = getProducer();
await producer.runAsync();

const consumer = getConsumer({
messageHandler: () => {
throw new Error();
Expand Down
1 change: 1 addition & 0 deletions tests/tests/consuming-messages/test00020.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { getProducer } from '../../common/producer';

test('Producing a message without a message queue', async () => {
const producer = getProducer();
await producer.runAsync();

const msg = new Message();
msg.setBody({ hello: 'world' });
Expand Down
6 changes: 2 additions & 4 deletions tests/tests/consuming-messages/test00021.test.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
import { Message } from '../../../index';
import { delay } from 'bluebird';
import { getProducer } from '../../common/producer';
import {
createQueue,
defaultQueue,
} from '../../common/message-producing-consuming';
import { ProducerNotRunningError } from '../../../src/lib/producer/errors/producer-not-running.error';

test('Shutdown a producer and try to produce a message', async () => {
const producer = getProducer();
await delay(5000);
await producer.shutdownAsync();
await createQueue(defaultQueue, false);

const msg = new Message();
msg.setBody({ hello: 'world' }).setQueue(defaultQueue);
await expect(async () => {
await producer.produceAsync(msg);
}).rejects.toThrow(`Producer ID ${producer.getId()} is not running`);
}).rejects.toThrowError(ProducerNotRunningError);
});
3 changes: 2 additions & 1 deletion tests/tests/consuming-messages/test00022.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ test('Shutdown a consumer when consuming a message with retryThreshold = 0: expe

const consumer = getConsumer({
messageHandler: jest.fn(() => {
consumer.shutdown();
setTimeout(() => consumer.shutdown(), 5000);
}),
});

Expand All @@ -23,6 +23,7 @@ test('Shutdown a consumer when consuming a message with retryThreshold = 0: expe
.setBody('message body')
.setQueue(defaultQueue);
const producer = getProducer();
await producer.runAsync();
await producer.produceAsync(msg);

consumer.run();
Expand Down
2 changes: 2 additions & 0 deletions tests/tests/consuming-messages/test00023.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ test('Periodic scheduled messages upon consume failures are dead-lettered withou
.setRetryThreshold(5)
.setQueue(defaultQueue);
const producer = getProducer();
await producer.runAsync();

await producer.produceAsync(msg);

consumer.run();
Expand Down
4 changes: 3 additions & 1 deletion tests/tests/consuming-messages/test00029.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ test('Rate limit a queue without priority and check message rate', async () => {
interval: 10000,
});

const producer = await getProducer();
const producer = getProducer();
await producer.runAsync();

await producer.produceAsync(
new Message().setBody('msg 1').setQueue(defaultQueue),
);
Expand Down
4 changes: 3 additions & 1 deletion tests/tests/consuming-messages/test00030.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ test('Rate limit a priority queue and check message rate', async () => {
interval: 10000,
});

const producer = await getProducer();
const producer = getProducer();
await producer.runAsync();

await producer.produceAsync(
new Message()
.setBody('msg 1')
Expand Down
4 changes: 3 additions & 1 deletion tests/tests/consuming-messages/test00031.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ test('Set a rate limit for a queue and consume messages using many consumers', a
await consumer.runAsync();
}

const producer = await getProducer();
const producer = getProducer();
await producer.runAsync();

for (let i = 0; i < 100; i += 1) {
await producer.produceAsync(
new Message().setBody(`msg ${i}`).setQueue(defaultQueue),
Expand Down
2 changes: 2 additions & 0 deletions tests/tests/consuming-messages/test00033.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ test('Consume messages from different queues using a single consumer instance: c
]);

const producer = getProducer();
await producer.runAsync();

for (let i = 0; i < 5; i += 1) {
await producer.produceAsync(
new Message().setQueue(`test${i + 1}`).setBody(`body ${i + 1}`),
Expand Down
2 changes: 2 additions & 0 deletions tests/tests/consuming-messages/test00034.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ test('Consume messages from different queues using a single consumer instance: c
await consumer.runAsync();

const producer = getProducer();
await producer.runAsync();

for (let i = 0; i < 5; i += 1) {
await producer.produceAsync(
new Message().setQueue(defaultQueue).setBody(`body ${i + 1}`),
Expand Down
2 changes: 2 additions & 0 deletions tests/tests/consuming-messages/test00035.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ test('Consume messages from different queues using a single consumer instance: c
});

const producer = getProducer();
await producer.runAsync();

await producer.produceAsync(new Message().setQueue('test0').setBody('body'));

await delay(10000);
Expand Down
1 change: 1 addition & 0 deletions tests/tests/consuming-messages/test00036.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ test('Producing a message and expecting different kind of failures', async () =>
await qm.queue.createAsync('test1', true);

const producer = getProducer();
await producer.runAsync();

try {
const msg = new Message()
Expand Down
1 change: 1 addition & 0 deletions tests/tests/consuming-messages/test00037.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ test('Scheduling a message and expecting different kind of failures', async () =
await qm.queue.createAsync('test1', true);

const producer = getProducer();
await producer.runAsync();

try {
const msg = new Message()
Expand Down
6 changes: 4 additions & 2 deletions tests/tests/event-listeners/test00002.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ const cfg: IConfig = {

test('Producer event listeners', async () => {
await createQueue(defaultQueue, false);
const p0 = await getProducer(cfg);
const p0 = getProducer(cfg);
await p0.runAsync();
const m0 = new Message().setQueue(defaultQueue).setBody(123);
await p0.produceAsync(m0);
const m1 = new Message().setQueue(defaultQueue).setBody(123);
await p0.produceAsync(m1);
const p1 = await getProducer(cfg);
const p1 = getProducer(cfg);
await p1.runAsync();
const m2 = new Message().setQueue(defaultQueue).setBody(123);
await p1.produceAsync(m2);
const m3 = new Message().setQueue(defaultQueue).setBody(123);
Expand Down
2 changes: 2 additions & 0 deletions tests/tests/misc/test00014.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { getProducer } from '../../common/producer';

test('Producer: isRunning, isGoingUp, isGoingDown, isUp, isDown', async () => {
const mProducer = getProducer();
await mProducer.runAsync();

expect(typeof mProducer.getId()).toBe('string');
if (mProducer.isGoingUp()) {
await new Promise((resolve) => {
Expand Down
3 changes: 2 additions & 1 deletion tests/tests/priority-queuing/test00002.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ test('Priority queuing: case 2', async () => {
}),
);

const producer = promisifyAll(getProducer());
const producer = getProducer();
await producer.runAsync();

// message 1
const msg1 = new Message();
Expand Down
2 changes: 2 additions & 0 deletions tests/tests/requeuing-messages/test00003.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ test('Combined test. Requeue a priority message from acknowledged queue. Check q
.setPriority(Message.MessagePriority.ABOVE_NORMAL);

const producer = getProducer();
await producer.runAsync();

await producer.produceAsync(message);

consumer.run();
Expand Down
2 changes: 2 additions & 0 deletions tests/tests/scheduling-messages/test00001.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ test('Schedule a message: DELAY', async () => {
.setQueue(defaultQueue); // seconds

const producer = getProducer();
await producer.runAsync();

await producer.produceAsync(msg);
const producedAt = Date.now();

Expand Down
2 changes: 2 additions & 0 deletions tests/tests/scheduling-messages/test00002.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ test('Schedule a message: messageManager.getScheduledMessages()', async () => {
await createQueue(defaultQueue, false);

const producer = getProducer();
await producer.runAsync();

const msg1 = new Message();
msg1.setScheduledDelay(30000);
msg1
Expand Down
Loading

0 comments on commit 573e340

Please sign in to comment.