Skip to content

Producing jobs

Pushing a job is a one-line call — queue.of::<TranscodeDto>("audio").push(job).await? — but where that line lives decides whether the feature stays auditable. The rule is the same one that holds Repo calls behind a service: controllers, resolvers, and gateways ask the service to enqueue. The service knows the queue name; the transport adapter doesn’t.

A job is any Serialize + DeserializeOwned + Clone + Send + Sync + 'static type — the marker nest_rs_queue::Job is auto-implemented for every type that satisfies those bounds. No derive, no trait to write. The convention is to keep payloads small: a reference to the work, not the work itself.

crates/features/src/audio/dto.rs
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
pub const AUDIO_QUEUE: &str = "audio";
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
pub struct TranscodeDto {
pub file: String,
}

JsonSchema is here because the same payload is also the HTTP body that triggers the enqueue — one type, two transports. The producer and consumer agree on the payload by importing it from the same crate; the queue name (AUDIO_QUEUE) is the only stringly-typed coupling, and it stays private to the service.

A producer is any code that asks the service to enqueue. The HTTP controller, the GraphQL resolver, the scheduled producer — all of them delegate. They do not hold Arc<QueueConnection> and do not know the queue name.

crates/features/src/audio/service.rs
use std::sync::Arc;
use anyhow::Result;
use nest_rs_core::injectable;
use nest_rs_redis::QueueConnection;
use super::dto::{AUDIO_QUEUE, TranscodeDto};
#[injectable]
pub struct AudioService {
#[inject]
queue: Arc<QueueConnection>,
}
impl AudioService {
pub async fn enqueue_transcode(&self, file: String) -> Result<()> {
self.queue
.of::<TranscodeDto>(AUDIO_QUEUE)
.push(TranscodeDto { file: file.clone() })
.await?;
tracing::info!(target: "features::audio", %file, "enqueued transcode job");
Ok(())
}
pub async fn transcode(&self, file: &str) -> Result<()> {
Ok(())
}
}

queue.of::<T>(name) returns a typed Queue<T> publisher bound to the queue name. push is async because Redis is async — that’s the whole producer surface from a feature’s point of view. Inside the call, nest-rs-redis wraps your payload in the wire envelope ({ "v": 1, "payload": ... }) so any other backend can drain it. You never see that.

The controller is a thin transport. It accepts the body, calls the service, returns the wire response — no queue knowledge of its own.

crates/features/src/audio/http/controller.rs
use std::sync::Arc;
use nest_rs_http::{controller, routes};
use poem::http::StatusCode;
use poem::web::Json;
use poem::{Error, Result};
use crate::audio::{AudioService, TranscodeDto};
#[controller(path = "/audio")]
#[use_guards(AuthGuard, AuthzGuard)]
pub struct AudioController {
#[inject]
svc: Arc<AudioService>,
}
#[routes]
impl AudioController {
#[post("/transcode")]
async fn transcode(&self, body: Json<TranscodeDto>) -> Result<Json<TranscodeDto>> {
let job = body.0;
self.svc
.enqueue_transcode(job.file.clone())
.await
.map_err(|e| Error::from_string(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR))?;
Ok(Json(job))
}
}

Swap this for a #[resolver] mutation, a #[gateway] message, or a #[scheduled] method on a timer — the controller is the only file that changes. The service stays the single audited entry point, every transport hits it through enqueue_transcode.

Three reasons the queue.of() call hides behind a service:

  1. One audit point per queue. The queue name lives in exactly one file. A grep for enqueue_transcode shows every producer.
  2. Pre-flight logic survives a swap of transport. If enqueueing needs an ability check, an idempotency key, a tracing field — the service holds it. Adding a second producer doesn’t duplicate that.
  3. The consumer can change without touching producers. Renaming the queue, splitting audio into audio.fast and audio.slow, moving from Redis to SQS — one file changes, every adapter follows.

The same shape the rest of the framework uses for Repo, just on the push side. The service is the contract; transports adapt.

  • crates/features/src/audio/service.rsenqueue_transcode.
  • crates/features/src/audio/http/controller.rs — the HTTP producer.
  • crates/features/src/audio/schedule/producer.rs — the scheduled producer (a timer that calls the same service).
  • crates/nest-rs-redis/src/connection.rsQueueConnection::of and Queue::push.
  • crates/nest-rs-queue/src/producer.rsJobProducer, JobProducerExt.