Queue
A queue is the durable, distributed counterpart to a scheduled job. A
producer pushes a typed payload onto a named queue; a
consumer (a #[processor]) reads from the same name with its own
concurrency and retry policy. They are decoupled — different binaries,
different deploys, different replica counts.
Backed by Redis through apalis, wrapped by
the nestrs-queue crate so the framework owns the DI shape and the
discovery registry.
A durable consumer
Section titled “A durable consumer”This is the real consumer from apps/worker:
use std::sync::Arc;
use anyhow::Result;use nestrs_queue::{async_trait, processor, Processor};
use crate::audio::dto::TranscodeJob;use crate::audio::service::Transcoder;
#[processor(queue = "audio", concurrency = 5, retries = 3)]pub struct AudioProcessor { #[inject] transcoder: Arc<Transcoder>,}
#[async_trait]impl Processor for AudioProcessor { type Job = TranscodeJob;
async fn process(&self, job: TranscodeJob) -> Result<()> { self.transcoder.transcode(&job.file).await }}#[processor(queue, concurrency, retries)]—queueis the Redis queue name (stringly-typed, the known cost — there is no separately-typedQueue<TranscodeJob>).concurrencycaps the in-flight jobs per worker.retriesis the upper bound on retry attempts (apalis backoff applies).type Job = TranscodeJob— the typed payload. The framework serdes the job at the queue boundary;TranscodeJobis what yourprocessmethod receives.
The payload
Section titled “The payload”use serde::{Deserialize, Serialize};
pub const AUDIO_QUEUE: &str = "audio";
#[derive(Clone, Debug, Deserialize, Serialize)]pub struct TranscodeJob { pub file: String,}A job is any Serialize + DeserializeOwned + Send + 'static
type — no special trait. The convention is to keep payloads small
(just a reference to the work, not the work itself).
A producer
Section titled “A producer”A producer is any code holding Arc<QueueConnection>. It can be a
scheduled job, an HTTP handler, an event-bus subscriber — anything in
the DI graph.
use nestrs_queue::QueueConnection;
#[injectable]pub struct UploadService { #[inject] queue: Arc<QueueConnection>,}
impl UploadService { pub async fn enqueue(&self, file: String) -> anyhow::Result<()> { self.queue .of::<TranscodeJob>("audio") .push(TranscodeJob { file }) .await?; Ok(()) }}queue.of::<T>(name) returns a typed publisher bound to the queue
name. The producer and consumer share the type by importing it from
the same crate — the queue name is the only stringly-typed coupling.
Wire it in
Section titled “Wire it in”The consumer side runs in its own binary (or alongside others). Mount
the QueueWorker transport in main and import
QueueModule::for_root:
use nestrs_core::module;use nestrs_queue::QueueModule;use crate::audio::AudioModule;
#[module( imports = [ QueueModule::for_root(None), // reads NESTRS_QUEUE__URL AudioModule, ],)]pub struct AppModule;use nestrs_core::App;use nestrs_queue::QueueWorker;use worker::AppModule;
#[tokio::main]async fn main() -> anyhow::Result<()> { App::builder() .module::<AppModule>() .build() .await? .transport(QueueWorker::new()) // ← drains the inventory of processors .run() .await}A producer-only binary (e.g. apps/api pushing jobs without
consuming them) imports QueueModule::for_root to gain
QueueConnection, but does not mount QueueWorker — no
processors to run.
Run it
Section titled “Run it”$ just dev worker2026-06-03T10:18:41Z INFO nestrs::queue: registered 1 processor AudioProcessor • queue=audio concurrency=5 retries=32026-06-03T10:18:46Z INFO worker::audio: queued transcode job file=track-1717405126521.mp32026-06-03T10:18:46Z INFO worker::audio: transcoding file=track-1717405126521.mp32026-06-03T10:18:46Z INFO nestrs::queue: job ok queue=audio elapsed=42msProducer and consumer can run in the same binary (here worker does
both, via Scheduler + QueueWorker), or split across deploys with the
producer in apps/api and the consumer in apps/worker. The Redis
queue is the only thing they share.
Retries and failure
Section titled “Retries and failure”#[processor(queue = "audio", concurrency = 5, retries = 3)]A processor that returns Err is retried by apalis with backoff
up to retries times. After exhaustion, the job is moved to the
queue’s failed list (apalis-specific). A panic in process is
treated as a failure.
The ambient data context
Section titled “The ambient data context”Like a scheduled job, a processor runs as system work:
- A pool executor is installed (importing
DatabaseModuleis enough);Reporeads go through the pool. - No ability is installed;
Reporeads are unscoped — correct for system work.
Lean binaries
Section titled “Lean binaries”A worker imports no HTTP crate. Its release binary doesn’t link
the poem server, the controllers, the OpenAPI doc, the GraphQL
schema. The features crate is shared, but module-gated discovery
keeps the unused providers inert.
$ ls -lh target/release/worker target/release/api-rwxr-xr-x 18M worker-rwxr-xr-x 41M api(orders of magnitude shown for illustration — actual sizes depend on features.)
Going further
Section titled “Going further”- Schedule —
#[cron_job]producers that push onto a queue on a timer. - Observability — every queue span targets
nestrs::queuewith structuredqueue,job_id,elapsed_ms,attemptfields.
Reference
Section titled “Reference”apps/worker/src/audio/processor.rs— the consumer.apps/worker/src/audio/producer.rs— a scheduled producer.crates/nestrs-queue/—#[processor],QueueConnection,QueueWorker, theProcessortrait.