Register the module
app.module.ts
Produce — @InjectQueue
mailer.service.ts
Consume — @Processor / @Process
emails.consumer.ts
@Processor classes at onApplicationBootstrap, finds
the @Process method, and runs a long-running consume loop wired to
AbortController via onApplicationShutdown — graceful shutdown works
out of the box.
Options
AsyncBaseModule.forRootAsync
@Processor(queue, options)
| Option | Default | Purpose |
|---|---|---|
group | <queue>-workers | Consumer group name |
consumer | auto | Stable consumer id for XAUTOCLAIM |
visibilitySeconds | 60 | Server-side visibility timeout |
batchSize | 10 | Max messages per pull (1..100) |
waitSeconds | 5 | Long-poll duration (0..20) |
idlePollMs | 1000 | Sleep between empty polls |
Multiple queues
Graceful shutdown
BeforeNestFactory.create(...).listen() (or createApplicationContext):
SIGINT / SIGTERM aborts in-flight consume loops and awaits
handlers to finish.
Error handling
Throwing from a@Process method auto-nacks the message. AsyncBase
applies exponential backoff per attempt server-side. After retries
(default 3), the message lands in the DLQ. Configure the retry count
per-send or per-queue.
Testing
Unit tests don’t need the module at all — instantiateEmailsConsumer
directly and call handle(mockMessage). For integration tests, point
the SDK at a test endpoint: