Skip to main content
npm i @asyncbase/sdk @asyncbase/nestjs reflect-metadata
Works with NestJS 10 + 11.

Register the module

app.module.ts
import { Module } from "@nestjs/common"
import { AsyncBaseModule } from "@asyncbase/nestjs"
import { EmailsConsumer } from "./emails.consumer"
import { MailerService } from "./mailer.service"

@Module({
  imports: [
    AsyncBaseModule.forRoot({ apiKey: process.env.ASYNCBASE_KEY! }),
    AsyncBaseModule.registerQueue({ name: "emails" }),
  ],
  providers: [EmailsConsumer, MailerService],
})
export class AppModule {}

Produce — @InjectQueue

mailer.service.ts
import { Injectable } from "@nestjs/common"
import { AsyncBaseQueueProxy, InjectQueue } from "@asyncbase/nestjs"

@Injectable()
export class MailerService {
  constructor(
    @InjectQueue("emails") private readonly emails: AsyncBaseQueueProxy,
  ) {}

  async welcome(to: string) {
    await this.emails.send({ to, template: "welcome" })
  }
}

Consume — @Processor / @Process

emails.consumer.ts
import { Logger } from "@nestjs/common"
import { Processor, Process, type ConsumedMessage } from "@asyncbase/nestjs"

@Processor("emails", {
  group: "emails-workers",
  visibilitySeconds: 60,
  batchSize: 10,
  waitSeconds: 5,
})
export class EmailsConsumer {
  private readonly log = new Logger("EmailsConsumer")

  @Process()
  async handle(msg: ConsumedMessage): Promise<void> {
    this.log.log(`processing ${msg.id}`)
    await send(msg.payload)
    // return normally → auto-ack
    // throw → auto-nack (exp-backoff; DLQ after N retries)
  }
}
The module scans @Processor classes at onApplicationBootstrap, finds the @Process method, and runs a long-running consume loop wired to AbortController via onApplicationShutdown — graceful shutdown works out of the box.

Options

AsyncBaseModule.forRootAsync

AsyncBaseModule.forRootAsync({
  imports: [ConfigModule],
  useFactory: (config: ConfigService) => ({
    apiKey: config.getOrThrow("ASYNCBASE_KEY"),
    baseUrl: config.get("ASYNCBASE_BASE_URL"),
  }),
  inject: [ConfigService],
})

@Processor(queue, options)

OptionDefaultPurpose
group<queue>-workersConsumer group name
consumerautoStable consumer id for XAUTOCLAIM
visibilitySeconds60Server-side visibility timeout
batchSize10Max messages per pull (1..100)
waitSeconds5Long-poll duration (0..20)
idlePollMs1000Sleep between empty polls

Multiple queues

AsyncBaseModule.registerQueue(
  { name: "emails" },
  { name: "webhooks" },
  { name: "billing" },
)

Graceful shutdown

Before NestFactory.create(...).listen() (or createApplicationContext):
app.enableShutdownHooks()
Now SIGINT / SIGTERM aborts in-flight consume loops and awaits handlers to finish.

Error handling

Throwing from a @Process method auto-nacks the message. AsyncBase applies exponential backoff per attempt server-side. After retries (default 3), the message lands in the DLQ. Configure the retry count per-send or per-queue.

Testing

Unit tests don’t need the module at all — instantiate EmailsConsumer directly and call handle(mockMessage). For integration tests, point the SDK at a test endpoint:
AsyncBaseModule.forRoot({
  apiKey: "sk_test_fake",
  baseUrl: mockServerUrl,
})