Skip to content

Schedule

A scheduled job is just a struct with #[cron_job(...)] — the same DI shape as every other provider. The Scheduler transport drains the registry at boot, validates each trigger, and dispatches the jobs in-process — no broker, no second binary.

Triggers are validated at compile time (string literals) or at boot (presets, timezones); a bad cron expression fails the boot naming the job.

This is the real producer from apps/worker — every 5 seconds it generates a fake transcode task and pushes it onto a queue:

apps/worker/src/audio/producer.rs
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use anyhow::Result;
use nestrs_queue::QueueConnection;
use nestrs_schedule::{async_trait, cron_job, CronExpression, Scheduled};
use crate::audio::dto::{TranscodeJob, AUDIO_QUEUE};
#[cron_job(cron = CronExpression::EVERY_5_SECONDS)]
pub struct AudioProducer {
#[inject]
queue: Arc<QueueConnection>,
}
#[async_trait]
impl Scheduled for AudioProducer {
async fn run(&self) -> Result<()> {
let id = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis();
let file = format!("track-{id}.mp3");
self.queue
.of::<TranscodeJob>(AUDIO_QUEUE)
.push(TranscodeJob { file: file.clone() })
.await?;
tracing::info!(target: "worker::audio", %file, "queued transcode job");
Ok(())
}
}
  • #[cron_job(cron = ...)] decorates the struct. The macro registers it with the scheduler discovery registry and records its injected dependencies for the access graph.
  • #[inject] queue: Arc<QueueConnection> — the container hands the job whatever it needs. No magical singletons.
  • impl Scheduled for AudioProducer is the body. The framework calls run on each tick. A Result::Err is logged at error; the job retries on its next scheduled tick.
FormMeaning
every = "30s"Fixed interval (s, m, h, d)
cron = "0 */5 * * * *"6-field cron expression
cron = CronExpression::EVERY_5_SECONDSNamed preset (compile-time validated)
after = "10s"One-shot — runs N after boot, then never
cron = "0 9 * * MON", tz = "Europe/Paris"Cron + named timezone

A literal cron expression is validated at compile time; presets and timezones at boot. Either way, an invalid trigger fails the boot naming the job and the offending value — never silently skipped.

apps/worker/src/audio/module.rs
use nestrs_core::module;
use super::{producer::AudioProducer, processor::AudioProcessor, service::Transcoder};
#[module(providers = [AudioProducer, AudioProcessor, Transcoder])]
pub struct AudioModule;
apps/worker/src/main.rs
use nestrs_core::App;
use nestrs_schedule::Scheduler;
use worker::AppModule;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
App::builder()
.module::<AppModule>()
.build()
.await?
.transport(Scheduler::new()) // ← runs every #[cron_job]
.run()
.await
}

The Scheduler transport is what drains the registry and runs the jobs. Importing nestrs-schedule without mounting Scheduler::new() means the jobs compile in but never tick — a deliberate choice: an HTTP app linking the features crate doesn’t accidentally run a worker’s schedule.

Terminal window
$ just dev worker
2026-06-03T10:18:41Z INFO nestrs::schedule: registered 1 cron job
AudioProducer every 5s
2026-06-03T10:18:46Z INFO worker::audio: queued transcode job file=track-1717405126521.mp3
2026-06-03T10:18:51Z INFO worker::audio: queued transcode job file=track-1717405131524.mp3

A bad trigger fails the boot:

Terminal window
$ just dev worker
Error: invalid cron expression for job 'NightlyReport'
expression : "every monday"
reason : missing seconds field

A job has no caller — system work, intentionally. The framework installs a pool executor in the ambient data context (Repo reads go through the pool) and no ability is installed (Repo reads are unscoped, correct for system work).

#[cron_job(every = "1h")]
pub struct CleanupExpired {
#[inject]
svc: Arc<ItemsService>,
}
#[async_trait]
impl Scheduled for CleanupExpired {
async fn run(&self) -> Result<()> {
// ItemsService uses Repo<Items>::conn() — picks up the pool executor.
self.svc.delete_expired().await?;
Ok(())
}
}

Just importing DatabaseModule is enough — no manual connection plumbing.

  • Queue — pair a scheduled producer with a durable consumer for distributed work.
  • Observability — every job span is targeted on nestrs::schedule; structured fields (job, ran_for) are automatic.
  • apps/worker/src/audio/producer.rs — the canonical interval producer.
  • crates/nestrs-schedule/#[cron_job], Scheduler, CronExpression, the Scheduled trait.