Producing jobs
Pushing a job is a one-line call — queue.of::<TranscodeDto>("audio").push(job).await? — but
where that line lives decides whether the feature stays auditable. The
rule is the same one that holds Repo calls behind a service:
controllers, resolvers, and gateways ask the service to enqueue. The
service knows the queue name; the transport adapter doesn’t.
The payload
Section titled “The payload”A job is any Serialize + DeserializeOwned + Clone + Send + Sync + 'static
type — the marker nest_rs_queue::Job is auto-implemented for every
type that satisfies those bounds. No derive, no trait to write. The
convention is to keep payloads small: a reference to the work, not the
work itself.
use schemars::JsonSchema;use serde::{Deserialize, Serialize};
pub const AUDIO_QUEUE: &str = "audio";
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]pub struct TranscodeDto { pub file: String,}JsonSchema is here because the same payload is also the HTTP body
that triggers the enqueue — one type, two transports. The producer and
consumer agree on the payload by importing it from the same crate; the
queue name (AUDIO_QUEUE) is the only stringly-typed coupling, and it
stays private to the service.
The service is the choke point
Section titled “The service is the choke point”A producer is any code that asks the service to enqueue. The HTTP
controller, the GraphQL resolver, the scheduled producer — all of them
delegate. They do not hold Arc<QueueConnection> and do not
know the queue name.
use std::sync::Arc;
use anyhow::Result;use nest_rs_core::injectable;use nest_rs_redis::QueueConnection;
use super::dto::{AUDIO_QUEUE, TranscodeDto};
#[injectable]pub struct AudioService { #[inject] queue: Arc<QueueConnection>,}
impl AudioService { pub async fn enqueue_transcode(&self, file: String) -> Result<()> { self.queue .of::<TranscodeDto>(AUDIO_QUEUE) .push(TranscodeDto { file: file.clone() }) .await?; tracing::info!(target: "features::audio", %file, "enqueued transcode job"); Ok(()) }
pub async fn transcode(&self, file: &str) -> Result<()> { Ok(()) }}queue.of::<T>(name) returns a typed Queue<T> publisher bound to the
queue name. push is async because Redis is async — that’s the whole
producer surface from a feature’s point of view. Inside the call,
nest-rs-redis wraps your payload in the wire envelope ({ "v": 1, "payload": ... }) so any other backend can drain it. You never see
that.
The HTTP adapter
Section titled “The HTTP adapter”The controller is a thin transport. It accepts the body, calls the service, returns the wire response — no queue knowledge of its own.
use std::sync::Arc;
use nest_rs_http::{controller, routes};use poem::http::StatusCode;use poem::web::Json;use poem::{Error, Result};
use crate::audio::{AudioService, TranscodeDto};
#[controller(path = "/audio")]#[use_guards(AuthGuard, AuthzGuard)]pub struct AudioController { #[inject] svc: Arc<AudioService>,}
#[routes]impl AudioController { #[post("/transcode")] async fn transcode(&self, body: Json<TranscodeDto>) -> Result<Json<TranscodeDto>> { let job = body.0; self.svc .enqueue_transcode(job.file.clone()) .await .map_err(|e| Error::from_string(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR))?; Ok(Json(job)) }}Swap this for a #[resolver] mutation, a #[gateway] message, or a
#[scheduled] method on a timer — the controller is the only file that
changes. The service stays the single audited entry point, every
transport hits it through enqueue_transcode.
Why this rule
Section titled “Why this rule”Three reasons the queue.of() call hides behind a service:
- One audit point per queue. The queue name lives in exactly one
file. A grep for
enqueue_transcodeshows every producer. - Pre-flight logic survives a swap of transport. If enqueueing needs an ability check, an idempotency key, a tracing field — the service holds it. Adding a second producer doesn’t duplicate that.
- The consumer can change without touching producers. Renaming the
queue, splitting
audiointoaudio.fastandaudio.slow, moving from Redis to SQS — one file changes, every adapter follows.
The same shape the rest of the framework uses for Repo, just on the
push side. The service is the contract; transports adapt.
Reference
Section titled “Reference”crates/features/src/audio/service.rs—enqueue_transcode.crates/features/src/audio/http/controller.rs— the HTTP producer.crates/features/src/audio/schedule/producer.rs— the scheduled producer (a timer that calls the same service).crates/nest-rs-redis/src/connection.rs—QueueConnection::ofandQueue::push.crates/nest-rs-queue/src/producer.rs—JobProducer,JobProducerExt.