Skip to content

Queue

A queue is the durable, distributed counterpart to a scheduled job. A producer pushes a typed payload onto a named queue; a consumer (a #[processor]) reads from the same name with its own concurrency and retry policy. They are decoupled — different binaries, different deploys, different replica counts.

Backed by Redis through apalis, wrapped by the nestrs-queue crate so the framework owns the DI shape and the discovery registry.

This is the real consumer from apps/worker:

apps/worker/src/audio/processor.rs
use std::sync::Arc;
use anyhow::Result;
use nestrs_queue::{async_trait, processor, Processor};
use crate::audio::dto::TranscodeJob;
use crate::audio::service::Transcoder;
#[processor(queue = "audio", concurrency = 5, retries = 3)]
pub struct AudioProcessor {
#[inject]
transcoder: Arc<Transcoder>,
}
#[async_trait]
impl Processor for AudioProcessor {
type Job = TranscodeJob;
async fn process(&self, job: TranscodeJob) -> Result<()> {
self.transcoder.transcode(&job.file).await
}
}
  • #[processor(queue, concurrency, retries)]queue is the Redis queue name (stringly-typed, the known cost — there is no separately-typed Queue<TranscodeJob>). concurrency caps the in-flight jobs per worker. retries is the upper bound on retry attempts (apalis backoff applies).
  • type Job = TranscodeJob — the typed payload. The framework serdes the job at the queue boundary; TranscodeJob is what your process method receives.
apps/worker/src/audio/dto.rs
use serde::{Deserialize, Serialize};
pub const AUDIO_QUEUE: &str = "audio";
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct TranscodeJob {
pub file: String,
}

A job is any Serialize + DeserializeOwned + Send + 'static type — no special trait. The convention is to keep payloads small (just a reference to the work, not the work itself).

A producer is any code holding Arc<QueueConnection>. It can be a scheduled job, an HTTP handler, an event-bus subscriber — anything in the DI graph.

apps/worker/src/audio/producer.rs
use nestrs_queue::QueueConnection;
#[injectable]
pub struct UploadService {
#[inject]
queue: Arc<QueueConnection>,
}
impl UploadService {
pub async fn enqueue(&self, file: String) -> anyhow::Result<()> {
self.queue
.of::<TranscodeJob>("audio")
.push(TranscodeJob { file })
.await?;
Ok(())
}
}

queue.of::<T>(name) returns a typed publisher bound to the queue name. The producer and consumer share the type by importing it from the same crate — the queue name is the only stringly-typed coupling.

The consumer side runs in its own binary (or alongside others). Mount the QueueWorker transport in main and import QueueModule::for_root:

apps/worker/src/app.rs
use nestrs_core::module;
use nestrs_queue::QueueModule;
use crate::audio::AudioModule;
#[module(
imports = [
QueueModule::for_root(None), // reads NESTRS_QUEUE__URL
AudioModule,
],
)]
pub struct AppModule;
apps/worker/src/main.rs
use nestrs_core::App;
use nestrs_queue::QueueWorker;
use worker::AppModule;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
App::builder()
.module::<AppModule>()
.build()
.await?
.transport(QueueWorker::new()) // ← drains the inventory of processors
.run()
.await
}

A producer-only binary (e.g. apps/api pushing jobs without consuming them) imports QueueModule::for_root to gain QueueConnection, but does not mount QueueWorker — no processors to run.

Terminal window
$ just dev worker
Terminal window
2026-06-03T10:18:41Z INFO nestrs::queue: registered 1 processor
AudioProcessor • queue=audio concurrency=5 retries=3
2026-06-03T10:18:46Z INFO worker::audio: queued transcode job file=track-1717405126521.mp3
2026-06-03T10:18:46Z INFO worker::audio: transcoding file=track-1717405126521.mp3
2026-06-03T10:18:46Z INFO nestrs::queue: job ok queue=audio elapsed=42ms

Producer and consumer can run in the same binary (here worker does both, via Scheduler + QueueWorker), or split across deploys with the producer in apps/api and the consumer in apps/worker. The Redis queue is the only thing they share.

#[processor(queue = "audio", concurrency = 5, retries = 3)]

A processor that returns Err is retried by apalis with backoff up to retries times. After exhaustion, the job is moved to the queue’s failed list (apalis-specific). A panic in process is treated as a failure.

Like a scheduled job, a processor runs as system work:

  • A pool executor is installed (importing DatabaseModule is enough); Repo reads go through the pool.
  • No ability is installed; Repo reads are unscoped — correct for system work.

A worker imports no HTTP crate. Its release binary doesn’t link the poem server, the controllers, the OpenAPI doc, the GraphQL schema. The features crate is shared, but module-gated discovery keeps the unused providers inert.

Terminal window
$ ls -lh target/release/worker target/release/api
-rwxr-xr-x 18M worker
-rwxr-xr-x 41M api

(orders of magnitude shown for illustration — actual sizes depend on features.)

  • Schedule#[cron_job] producers that push onto a queue on a timer.
  • Observability — every queue span targets nestrs::queue with structured queue, job_id, elapsed_ms, attempt fields.
  • apps/worker/src/audio/processor.rs — the consumer.
  • apps/worker/src/audio/producer.rs — a scheduled producer.
  • crates/nestrs-queue/#[processor], QueueConnection, QueueWorker, the Processor trait.