@nestjs-packages/sqs is a project to make SQS easier to use and control some required flows with NestJS. This module provides decorator-based message handling suited for simple use.
This library internally uses bbc/sqs-producer and bbc/sqs-consumer
$ npm i --save @nestjs-packages/sqs
# or
$ yarn add @nestjs-packages/sqs
For use SqsModule, You have to set SQS configurations. you can set SQS configurations by forRootAsync method. after forRootAsync method called, every sqs produers and consumers use SQS configurations returned by forRootAsync useFactory method
@Module({
imports: [
SqsModule.forRootAsync({
imports: [ConfigModule],
useFactory: (configService) => {
const config: SqsConfigOption = {
region: configService.region,
endpoint: configService.endpoint,
accountNumber: configService.accountNumber,
credentials: {
accessKeyId: configService.accessKeyId,
secretAccessKey: configService.secretAccessKey,
},
};
return new SqsConfig(config);
},
inject: [ConfigService],
}),
],
})
class AppModule {}
Second you have to register queues. register queues means create sqs-producer and sqs-consumer by queueOptions that passed into registerQueue parameter default type of queueOption is 'ALL'
SqsModule.registerQueue(
{
name: 'queueName',
type?: SqsQueueType.Consumer // 'ALL'|'CONSUMER'|'PRODUCER'
consumerOptions?: {},
producerOptions?: {}
},
...
);
You need to decorate providers and methods in your NestJS providers in order to have them be automatically attached as message(event) handlers for incoming SQS messages
@SqsProcess(/** name: */ queueName)
export class AppMessageHandler {
@SqsMessageHandler(/** batch: */ false)
public async handleMessage(message: AWS.SQS.Message) {}
@SqsConsumerEventHandler(/** eventName: */ SqsConsumerEvent.PROCESSING_ERROR)
public onProcessingError(error: Error, message: AWS.SQS.Message) {
// report errors here
}
}
You need to pass queueName to SqsProcess decorator. unless SqsModule won't register Consumer. One class can only handle one queue. so if you want to enroll dead letter queue, you have to make new class that handle dead letter queue
SqsService needs to be injected to produce the message.
@Injectable()
export class AppService {
public constructor(
private readonly sqsService: SqsService,
) { }
public async dispatchSomething() {
await this.sqsService.send<BodyType>(/** name: */ 'queueName', {
id: 'id',
body: { ... },
groupId: 'groupId',
deduplicationId: 'deduplicationId',
messageAttributes: { ... },
delaySeconds: 0,
});
}
}
See here, and note that we have same configuration as bbc/sqs-consumer's.
In most time you just need to specify both name
and queueUrl
at the minimum requirements.
This project is licensed under the terms of the MIT license.