Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Scheduling Model

Graphile Worker RS schedules work by inserting rows into PostgreSQL. A job becomes runnable when it is available, unlocked, below its retry limit, and its run_at timestamp is due.

Most Rust applications schedule jobs through WorkerUtils:

let utils = worker.create_utils();

utils.add_job(
    SendEmail {
        to: "user@example.com".to_string(),
        subject: "Welcome".to_string(),
        body: "Thanks for signing up.".to_string(),
    },
    JobSpec::default(),
).await?;

Use add_raw_job when the task identifier is only known at runtime:

utils.add_raw_job(
    "send_email",
    serde_json::json!({
        "to": "user@example.com",
        "subject": "Welcome"
    }),
    JobSpec::default(),
).await?;

JobSpec

JobSpec contains the scheduling metadata sent to the database:

use chrono::Utc;
use graphile_worker::{JobKeyMode, JobSpecBuilder};

let spec = JobSpecBuilder::new()
    .queue_name("user:123")
    .run_at(Utc::now() + chrono::Duration::minutes(5))
    .max_attempts(5)
    .job_key("welcome-email:123")
    .job_key_mode(JobKeyMode::Replace)
    .priority(-10)
    .flags(vec!["email".to_string()])
    .build();

The database fills omitted values:

FieldMeaningDefault
queue_nameOptional queue used to serialize related jobs.No queue
run_atEarliest timestamp at which the job may run.now()
max_attemptsMaximum attempts before the job is permanently unavailable.25
job_keyOptional deduplication key.No key
job_key_modeHow an existing keyed job is handled.Replace
prioritySort key for runnable jobs. Lower numbers run first.0
flagsOptional flags stored on the job.No flags

When WorkerUtils is configured with with_use_local_time(true), Rust passes the application's current UTC time for omitted run_at values. Otherwise the database uses PostgreSQL now().

Immediate And Future Jobs

JobSpec::default() schedules a job immediately. To delay a job, set run_at:

use chrono::Utc;
use graphile_worker::JobSpecBuilder;

let spec = JobSpecBuilder::new()
    .run_at(Utc::now() + chrono::Duration::hours(1))
    .build();

utils.add_job(SendEmail { /* ... */ }, spec).await?;

Workers only fetch jobs whose run_at is less than or equal to the current time used by the worker query.

Priority

Runnable jobs are selected in ascending priority order, then ascending run_at. The default priority is 0, so negative values run before default-priority jobs and positive values run later.

let urgent = JobSpecBuilder::new()
    .priority(-10)
    .build();

let normal = JobSpec::default();

Priority is not a separate queue. It only affects ordering among jobs that are otherwise available to the worker.

Job Keys

A job_key deduplicates jobs. Adding another available job with the same key updates the existing row instead of creating another row.

use graphile_worker::{JobKeyMode, JobSpecBuilder};

let spec = JobSpecBuilder::new()
    .job_key("sync-user:123")
    .job_key_mode(JobKeyMode::Replace)
    .build();

utils.add_job(SyncUser { user_id: 123 }, spec).await?;

The supported JobKeyMode values are:

ModeBehavior
ReplaceReplace the existing available job's queue, task, payload, run_at, max_attempts, priority, and flags. Reset attempts and clear the last error.
PreserveRunAtReplace the job, but keep the existing run_at when the existing job has not been attempted yet.
UnsafeDedupeIf the key already exists, only increment the revision and update updated_at; the existing payload and timing are left in place.

For a keyed job that already exists but is no longer available, the database clears the old key and marks that old row as fully attempted before inserting the replacement.

Batch scheduling supports Replace and PreserveRunAt. UnsafeDedupe is rejected for add_jobs and add_raw_jobs. In a batch, if any job uses PreserveRunAt, the preserve-run-at behavior is applied to keyed conflicts in that batch.

Queues

queue_name groups jobs that must not run in parallel. Jobs with the same queue share a row in the private job queue table, and the worker locks that queue when it fetches a queued job.

let spec = JobSpecBuilder::new()
    .queue_name("account:123")
    .build();

utils.add_job(UpdateAccount { id: 123 }, spec).await?;

Use queues for per-user, per-account, or per-resource workflows where ordering or mutual exclusion matters. Jobs without a queue are not serialized by this mechanism.

Batch Scheduling

Use add_jobs or add_raw_jobs to insert many jobs in one database round trip:

let spec = JobSpec::default();

utils.add_jobs::<SendEmail>(&[
    (SendEmail { /* ... */ }, &spec),
    (SendEmail { /* ... */ }, &spec),
]).await?;

For batch task handlers, add_batch_job stores a JSON array in a single job. When a keyed batch job is replaced and both the existing and new payloads are arrays, the database appends the new array to the existing array.

Cron

Cron schedules are configured on WorkerOptions, not by manually looping in application code. The typed API builds cron definitions in Rust:

use graphile_worker::{Cron, CrontabFill, WorkerOptions};

let worker = WorkerOptions::default()
    .define_job::<SendDailyReport>()
    .with_cron(
        Cron::daily_at::<SendDailyReport>(8, 0)?
            .fill(CrontabFill::hours(1)),
    )
    .init()
    .await?;

Crontab text is also supported:

let worker = WorkerOptions::default()
    .define_job::<SendDailyReport>()
    .with_cron("0 8 * * * send_daily_report")?
    .init()
    .await?;

On each cron tick, the runner checks which crontabs match the current local timestamp and schedules matching jobs through the same database add_job function used by one-off jobs. Cron jobs can carry payload, queue, run_at, max_attempts, priority, job key, and job key mode metadata from the crontab definition. If the payload is a JSON object, the runner adds a _cron object with the scheduled timestamp and whether the job was backfilled.

The runner records known crontabs and their last execution timestamp so a cron tick is only scheduled once for a given crontab execution.

Retry Metadata

The worker increments attempts when it locks a job to run it. A job remains available only while attempts < max_attempts.

When a job fails, the failure query stores last_error, unlocks the job, and sets a future run_at using exponential backoff:

greatest(now(), run_at) + exp(least(attempts, 10)) seconds

This means retry delay grows with the attempt count and is capped by using at most attempt 10 in the exponent. Once attempts reaches max_attempts, the job is no longer available for normal fetching.

Existing jobs can be changed with management utilities such as rescheduling. RescheduleJobOptions can update run_at, priority, attempts, and max_attempts for selected job ids.