Writing a queue driver
The nest-rs-queue
crate is a seam: a JobProducer trait on the push side, a link-time
ProcessMethod inventory + a wire envelope on the consume side, and a
Transport slot the framework drives at boot. The seam names no storage
and no worker runtime — Redis, NATS JetStream, SQS, RabbitMQ, an
in-process channel, a Postgres queue table all plug in the same way.
This page gives you the shape of a driver, not its implementation:
the traits you implement, the registry you drain, with // your code here markers where the storage-specific work lands. The reference impl
is nest-rs-redis
(apalis on Redis) — read it once when a placeholder is unclear.
What you’ll implement
Section titled “What you’ll implement”- A
JobProducerimpl: wrap the wire envelope, push to your storage. - A
Transportimpl: drain the link-timeProcessMethodinventory and dispatch each envelope to its handler. - Two
Modules — one for the producer (apps that only enqueue), one for the worker (apps that consume).
A driver lives in a nestrs-<technology> crate that depends on
nest-rs-core, nest-rs-queue, plus whatever your storage and runtime
need. An app picks a driver by importing its module instead of
nest-rs-redis.
The wire envelope
Section titled “The wire envelope”The one binding rule across backends — the producer wraps, the macro-emitted consumer unwraps, the storage in between only moves JSON.
{ "v": 1, "payload": <user job as JSON> }WIRE_FORMAT_VERSION is exported from nest-rs-queue; use it instead of
hard-coding 1. The macro on the #[process] side validates the
version and fails closed with a clear error on mismatch, so a producer
that wraps a different shape surfaces immediately. Mismatching this is
the most common driver bug, so wrap exactly once at the producer edge.
Step 1 — The producer
Section titled “Step 1 — The producer”JobProducer::push_json is the only method the framework asks a
producer to implement. Per-job options, batching, scheduled jobs,
priority hints — those are your driver’s own API on the same struct.
use async_trait::async_trait;
/// Your driver's connection handle. The producer side of a queue.#[derive(Clone)]pub struct YourConnection { // your storage handle — a Redis pool, an SQS client, a NATS // connection, a channel sender, a Postgres pool...}
#[async_trait]impl nest_rs_queue::JobProducer for YourConnection { async fn push_json(&self, queue: &str, payload: serde_json::Value) -> anyhow::Result<()> { // (1) Wrap in the wire envelope — required for the // macro-emitted consumer to decode it. let envelope = serde_json::json!({ "v": nest_rs_queue::WIRE_FORMAT_VERSION, "payload": payload, });
// (2) Push the envelope onto your storage for `queue`. // Your code: `self.client.send(queue, envelope).await?`, // `RPUSH`, `Publish`, `INSERT INTO jobs (...) VALUES (...)`, // `SendMessage`, ... Ok(()) }}The typed producer.push::<Job>(queue, job).await call sites see is a
blanket impl on top of push_json — no extra code on your side.
Step 2 — The worker (Transport)
Section titled “Step 2 — The worker (Transport)”This is the meat. The worker is a nest_rs_core::Transport: the
framework calls configure with the assembled container at boot, then
serve with a cancellation token. Drain the inventory, route by queue
name, hand each envelope to the matching ProcessMethod::handler — the
macro owns everything after that. The five numbered comments mark the
places you write storage-specific code.
The framework owns the registry and the dispatch contract. Your code
owns the receive loop — how envelopes arrive (block-pop, subscribe,
long-poll, ReceiveMessage), how concurrency is bounded, how retries
and visibility timeouts wire up. Those are storage decisions.
use async_trait::async_trait;use nest_rs_core::{Container, ReachableProviders, Transport, injectable, inventory};use nest_rs_queue::ProcessMethod;use tokio_util::sync::CancellationToken;
#[injectable]pub struct YourWorker { // #[inject] YourConnection (or a separate consumer-side handle).}
#[async_trait]impl Transport for YourWorker { async fn configure(&mut self, _container: &Container) -> anyhow::Result<()> { Ok(()) }
async fn serve(self: Box<Self>, container: Container, cancel: CancellationToken) -> anyhow::Result<()> { // (1) Snapshot the inventory of #[process] methods. Filter by // module reachability so a linked-but-unimported processor // is silently skipped — same property nest-rs-redis relies on. let reachable = container.get::<ReachableProviders>(); let methods: Vec<&'static ProcessMethod> = inventory::iter::<ProcessMethod>() .filter(|m| { reachable .as_ref() .map_or(true, |r| r.0.contains(&(m.provider_type_id)())) }) .collect();
// (2) Group methods by queue name. let mut by_queue: /* HashMap<&'static str, Vec<&ProcessMethod>> */ = todo!(); for m in methods { /* by_queue.entry(m.queue).or_default().push(m); */ }
// (3) For each queue, spawn a worker task that loops. // Your code: replace `your_pop_from_queue(...)` with your // storage's receive primitive — BRPOP, subscribe, // ReceiveMessage, NOTIFY/LISTEN, a channel `recv`... // Respect `cancel` for graceful shutdown. for (queue, methods) in by_queue { let container = container.clone(); let cancel = cancel.clone(); tokio::spawn(async move { loop { tokio::select! { _ = cancel.cancelled() => break, envelope = /* your_pop_from_queue(queue) */ => { // (4) Dispatch. The macro-emitted handler takes // the raw serde_json::Value and the Container. // It internally: // - unwraps the wire envelope (v + payload), // - deserializes the typed Job, // - resolves the #[injectable] processor host, // - runs the method inside run_in_job_context(...), // - returns Err on failure for your retry budget // to act on. // You only feed it the Value; the macro does the rest. for method in &methods { let _ = (method.handler)(envelope.clone(), container.clone()).await; // (5) On Err, apply your retry/dead-letter policy // here — storage-specific. } } } } }); }
Ok(()) }}The handler call — (method.handler)(envelope, container) — is the
only thing your worker needs to know about a job. Inside that function
pointer the macro already emitted the envelope unwrap, the
serde_json::from_value to the user’s job type, the Container::get
of the #[injectable] host, and the run_in_job_context wrap. Your
code is the transport. The macro is the dispatcher.
Step 3 — The modules
Section titled “Step 3 — The modules”Two modules — same split as nest-rs-redis::{QueueModule, QueueWorkerModule}.
A producer-only app (an API binary that enqueues but doesn’t drain)
imports the first; a worker app imports both.
use nest_rs_core::{ContainerBuilder, DynamicModule, Module, TransportContribution};
/// Producer-side activation. Seeds `YourConnection` from config.pub struct YourQueueModule;
impl YourQueueModule { pub fn for_root(config: impl Into<Option<YourConfig>>) -> YourQueueSetup { YourQueueSetup { pinned: config.into() } }}
pub struct YourQueueSetup { pinned: Option<YourConfig>,}
impl DynamicModule for YourQueueSetup { fn collect(&self, builder: ContainerBuilder) -> ContainerBuilder { // Your code: build your client from config (env + pinned), // seed it as `YourConnection`, and bind it to // `Arc<dyn nest_rs_queue::JobProducer>` so callers stay portable. builder }}
/// Consumer-side activation. Contributes the `Transport`.pub struct YourQueueWorkerModule;
impl Module for YourQueueWorkerModule { fn register(builder: ContainerBuilder) -> ContainerBuilder { builder.provide_meta(TransportContribution { name: "YourWorker", build: |_| Ok(Box::new(YourWorker { /* ... */ })), }) }}Every field of YourConfig is settable both via NESTRS_<NAMESPACE>__<KEY>
and pinned in code — the framework-wide dual-path rule.
Step 4 — Using the integration
Section titled “Step 4 — Using the integration”Application code stays put. The swap is one line in the app’s
module.rs:
// Producer-only API binary:#[module(imports = [YourQueueModule::for_root(None), /* features */])]pub struct ApiModule;
// Worker binary (consumes too):#[module(imports = [ YourQueueModule::for_root(None), YourQueueWorkerModule, /* features that ship #[processor]s */])]pub struct WorkerModule;The #[processor] impl ... blocks in user code are unchanged — same
impl, same #[process(queue = "...")], same method signatures.
Ephemeral vs durable backends
Section titled “Ephemeral vs durable backends”The same three traits cover both shapes:
- Ephemeral (in-memory channels, in-process pub/sub): jobs vanish on restart, no cross-process delivery. Useful for tests and local CLIs where the queue’s lifetime equals the process’s.
- Durable (Redis, Postgres, SQS, RabbitMQ, NATS JetStream): retries, visibility timeouts, dead-letter queues. Your driver decides how to expose those — the framework takes no position.
No tradeoff is hidden. Both shapes plug into the same JobProducer and
Transport.
What you don’t get for free
Section titled “What you don’t get for free”The framework gives you the contract — Job, Processor,
JobHandler, WIRE_FORMAT_VERSION, the Transport slot, the
JobContext ambient executor for handlers that touch a DB.
Retry semantics, batching, priorities, scheduled jobs, ordering,
visibility timeouts, dead-letter routing — those are your driver’s
contract with its storage. Decide them deliberately, document them in
the driver’s README.
Reference
Section titled “Reference”crates/nest-rs-redis/— the reference impl; read itsconnection.rs,worker.rs, andmodule.rswhenever a placeholder above needs a concrete shape.crates/nest-rs-queue/README.md— the extension contract, theProcessMethodregistry, the envelope spec.- Database driver — the analogous scaffold for the data layer, same shape, same seams.