Skip to content

Writing a queue driver

The nest-rs-queue crate is a seam: a JobProducer trait on the push side, a link-time ProcessMethod inventory + a wire envelope on the consume side, and a Transport slot the framework drives at boot. The seam names no storage and no worker runtime — Redis, NATS JetStream, SQS, RabbitMQ, an in-process channel, a Postgres queue table all plug in the same way. This page gives you the shape of a driver, not its implementation: the traits you implement, the registry you drain, with // your code here markers where the storage-specific work lands. The reference impl is nest-rs-redis (apalis on Redis) — read it once when a placeholder is unclear.

  • A JobProducer impl: wrap the wire envelope, push to your storage.
  • A Transport impl: drain the link-time ProcessMethod inventory and dispatch each envelope to its handler.
  • Two Modules — one for the producer (apps that only enqueue), one for the worker (apps that consume).

A driver lives in a nestrs-<technology> crate that depends on nest-rs-core, nest-rs-queue, plus whatever your storage and runtime need. An app picks a driver by importing its module instead of nest-rs-redis.

The one binding rule across backends — the producer wraps, the macro-emitted consumer unwraps, the storage in between only moves JSON.

{ "v": 1, "payload": <user job as JSON> }

WIRE_FORMAT_VERSION is exported from nest-rs-queue; use it instead of hard-coding 1. The macro on the #[process] side validates the version and fails closed with a clear error on mismatch, so a producer that wraps a different shape surfaces immediately. Mismatching this is the most common driver bug, so wrap exactly once at the producer edge.

JobProducer::push_json is the only method the framework asks a producer to implement. Per-job options, batching, scheduled jobs, priority hints — those are your driver’s own API on the same struct.

use async_trait::async_trait;
/// Your driver's connection handle. The producer side of a queue.
#[derive(Clone)]
pub struct YourConnection {
// your storage handle — a Redis pool, an SQS client, a NATS
// connection, a channel sender, a Postgres pool...
}
#[async_trait]
impl nest_rs_queue::JobProducer for YourConnection {
async fn push_json(&self, queue: &str, payload: serde_json::Value) -> anyhow::Result<()> {
// (1) Wrap in the wire envelope — required for the
// macro-emitted consumer to decode it.
let envelope = serde_json::json!({
"v": nest_rs_queue::WIRE_FORMAT_VERSION,
"payload": payload,
});
// (2) Push the envelope onto your storage for `queue`.
// Your code: `self.client.send(queue, envelope).await?`,
// `RPUSH`, `Publish`, `INSERT INTO jobs (...) VALUES (...)`,
// `SendMessage`, ...
Ok(())
}
}

The typed producer.push::<Job>(queue, job).await call sites see is a blanket impl on top of push_json — no extra code on your side.

This is the meat. The worker is a nest_rs_core::Transport: the framework calls configure with the assembled container at boot, then serve with a cancellation token. Drain the inventory, route by queue name, hand each envelope to the matching ProcessMethod::handler — the macro owns everything after that. The five numbered comments mark the places you write storage-specific code.

The framework owns the registry and the dispatch contract. Your code owns the receive loop — how envelopes arrive (block-pop, subscribe, long-poll, ReceiveMessage), how concurrency is bounded, how retries and visibility timeouts wire up. Those are storage decisions.

use async_trait::async_trait;
use nest_rs_core::{Container, ReachableProviders, Transport, injectable, inventory};
use nest_rs_queue::ProcessMethod;
use tokio_util::sync::CancellationToken;
#[injectable]
pub struct YourWorker {
// #[inject] YourConnection (or a separate consumer-side handle).
}
#[async_trait]
impl Transport for YourWorker {
async fn configure(&mut self, _container: &Container) -> anyhow::Result<()> {
Ok(())
}
async fn serve(self: Box<Self>, container: Container, cancel: CancellationToken)
-> anyhow::Result<()>
{
// (1) Snapshot the inventory of #[process] methods. Filter by
// module reachability so a linked-but-unimported processor
// is silently skipped — same property nest-rs-redis relies on.
let reachable = container.get::<ReachableProviders>();
let methods: Vec<&'static ProcessMethod> = inventory::iter::<ProcessMethod>()
.filter(|m| {
reachable
.as_ref()
.map_or(true, |r| r.0.contains(&(m.provider_type_id)()))
})
.collect();
// (2) Group methods by queue name.
let mut by_queue: /* HashMap<&'static str, Vec<&ProcessMethod>> */ = todo!();
for m in methods { /* by_queue.entry(m.queue).or_default().push(m); */ }
// (3) For each queue, spawn a worker task that loops.
// Your code: replace `your_pop_from_queue(...)` with your
// storage's receive primitive — BRPOP, subscribe,
// ReceiveMessage, NOTIFY/LISTEN, a channel `recv`...
// Respect `cancel` for graceful shutdown.
for (queue, methods) in by_queue {
let container = container.clone();
let cancel = cancel.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = cancel.cancelled() => break,
envelope = /* your_pop_from_queue(queue) */ => {
// (4) Dispatch. The macro-emitted handler takes
// the raw serde_json::Value and the Container.
// It internally:
// - unwraps the wire envelope (v + payload),
// - deserializes the typed Job,
// - resolves the #[injectable] processor host,
// - runs the method inside run_in_job_context(...),
// - returns Err on failure for your retry budget
// to act on.
// You only feed it the Value; the macro does the rest.
for method in &methods {
let _ = (method.handler)(envelope.clone(), container.clone()).await;
// (5) On Err, apply your retry/dead-letter policy
// here — storage-specific.
}
}
}
}
});
}
Ok(())
}
}

The handler call — (method.handler)(envelope, container) — is the only thing your worker needs to know about a job. Inside that function pointer the macro already emitted the envelope unwrap, the serde_json::from_value to the user’s job type, the Container::get of the #[injectable] host, and the run_in_job_context wrap. Your code is the transport. The macro is the dispatcher.

Two modules — same split as nest-rs-redis::{QueueModule, QueueWorkerModule}. A producer-only app (an API binary that enqueues but doesn’t drain) imports the first; a worker app imports both.

use nest_rs_core::{ContainerBuilder, DynamicModule, Module, TransportContribution};
/// Producer-side activation. Seeds `YourConnection` from config.
pub struct YourQueueModule;
impl YourQueueModule {
pub fn for_root(config: impl Into<Option<YourConfig>>) -> YourQueueSetup {
YourQueueSetup { pinned: config.into() }
}
}
pub struct YourQueueSetup {
pinned: Option<YourConfig>,
}
impl DynamicModule for YourQueueSetup {
fn collect(&self, builder: ContainerBuilder) -> ContainerBuilder {
// Your code: build your client from config (env + pinned),
// seed it as `YourConnection`, and bind it to
// `Arc<dyn nest_rs_queue::JobProducer>` so callers stay portable.
builder
}
}
/// Consumer-side activation. Contributes the `Transport`.
pub struct YourQueueWorkerModule;
impl Module for YourQueueWorkerModule {
fn register(builder: ContainerBuilder) -> ContainerBuilder {
builder.provide_meta(TransportContribution {
name: "YourWorker",
build: |_| Ok(Box::new(YourWorker { /* ... */ })),
})
}
}

Every field of YourConfig is settable both via NESTRS_<NAMESPACE>__<KEY> and pinned in code — the framework-wide dual-path rule.

Application code stays put. The swap is one line in the app’s module.rs:

// Producer-only API binary:
#[module(imports = [YourQueueModule::for_root(None), /* features */])]
pub struct ApiModule;
// Worker binary (consumes too):
#[module(imports = [
YourQueueModule::for_root(None),
YourQueueWorkerModule,
/* features that ship #[processor]s */
])]
pub struct WorkerModule;

The #[processor] impl ... blocks in user code are unchanged — same impl, same #[process(queue = "...")], same method signatures.

The same three traits cover both shapes:

  • Ephemeral (in-memory channels, in-process pub/sub): jobs vanish on restart, no cross-process delivery. Useful for tests and local CLIs where the queue’s lifetime equals the process’s.
  • Durable (Redis, Postgres, SQS, RabbitMQ, NATS JetStream): retries, visibility timeouts, dead-letter queues. Your driver decides how to expose those — the framework takes no position.

No tradeoff is hidden. Both shapes plug into the same JobProducer and Transport.

The framework gives you the contract — Job, Processor, JobHandler, WIRE_FORMAT_VERSION, the Transport slot, the JobContext ambient executor for handlers that touch a DB. Retry semantics, batching, priorities, scheduled jobs, ordering, visibility timeouts, dead-letter routing — those are your driver’s contract with its storage. Decide them deliberately, document them in the driver’s README.

  • crates/nest-rs-redis/ — the reference impl; read its connection.rs, worker.rs, and module.rs whenever a placeholder above needs a concrete shape.
  • crates/nest-rs-queue/README.md — the extension contract, the ProcessMethod registry, the envelope spec.
  • Database driver — the analogous scaffold for the data layer, same shape, same seams.