Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Architecture

Graphile Worker RS is a PostgreSQL-backed job queue. Application code writes jobs into the Graphile Worker schema, worker processes lock ready jobs with PostgreSQL row locks, task handlers run in the configured async runtime, and the worker persists completion, retry, failure, or recovery state back to the database.

The public crate is split into a small set of user-facing concepts:

  • WorkerOptions configures a worker, registers task handlers, and creates a Worker.
  • Worker runs the job loop until shutdown.
  • WorkerUtils adds and manages jobs outside the worker loop.
  • TaskHandler defines the Rust code that handles a job payload.
  • JobSpec configures scheduling, queues, attempts, priority, flags, and job keys for inserted jobs.

Internally, the root crate coordinates smaller workspace crates. The query crate owns SQL construction, the job/task/spec crates define shared data structures, the lifecycle hook crate exposes extension points, and the runtime facade keeps the worker logic independent of a specific async runtime feature.

Data Flow

The normal path from enqueue to release is:

  1. Application code adds a job through WorkerUtils or the lower-level add_job SQL wrapper.
  2. PostgreSQL stores the job in the worker schema and emits the database notification used by listeners.
  3. A running worker receives a signal from LISTEN jobs:insert, periodic polling, run_once, or an internal local-queue signal.
  4. Worker tasks fetch ready jobs with for update skip locked, filtered to the task identifiers registered in this worker.
  5. The worker builds a WorkerContext, runs the matching TaskHandler, and catches task errors and panics.
  6. The release path deletes completed jobs, reschedules retryable failures, or returns interrupted jobs for recovery.
application
  |
  | add_job(identifier, payload, JobSpec)
  v
PostgreSQL worker schema
  |
  | LISTEN/NOTIFY or polling
  v
Worker job loop
  |
  | get_job / batch_get_jobs
  v
TaskHandler::run(WorkerContext)
  |
  +-- success ----------> complete_job: delete job, unlock queue
  |
  +-- task failure -----> fail_job: save error, unlock, retry later
  |
  +-- shutdown abort ---> recovery return: decrement attempt, unlock, delay

Adding Jobs

The query layer inserts jobs by calling the schema's add_job database function with the task identifier, JSON payload, and JobSpec fields:

let job = worker
    .create_utils()
    .add_raw_job(
        "send_email",
        serde_json::json!({ "to": "a@example.com" }),
        graphile_worker::JobSpec::default(),
    )
    .await?;

The SQL wrapper passes these options to PostgreSQL:

  • identifier: the task name registered by a handler.
  • payload: the JSON payload stored with the job.
  • queue_name: optional queue serialization key.
  • run_at: optional scheduled time.
  • max_attempts: optional retry limit.
  • job_key and job_key_mode: optional job de-duplication/update behavior.
  • priority: lower values are fetched first.
  • flags: labels that workers may skip through forbidden_flags.

When tracing context is active, the insert path can add trace information into the payload before writing it. If use_local_time is enabled and no run_at is supplied, the Rust process time is used as the job's run time; otherwise the SQL layer uses database time.

Fetching Jobs

Workers do not scan every job type. During initialization they know the task identifiers they can run, and fetch only jobs whose task_id is in that set. The fetch query requires:

  • is_available = true
  • run_at <= now(), or the supplied local timestamp
  • no forbidden flags selected by this worker
  • an available queue when the job belongs to a named queue

Jobs are ordered by priority asc, run_at asc, then locked with:

for update
skip locked

After selecting a row, the same query increments attempts, sets locked_by to the worker id, and records locked_at. For queued jobs, the related job_queues row is also locked by the worker. This is the core coordination mechanism that lets multiple worker processes share the same PostgreSQL queue without taking the same job.

Worker Loop

Worker::run() performs the runtime lifecycle in this order:

  1. Register the worker when recovery support is enabled.
  2. Emit worker start hooks.
  3. Spawn recovery background tasks.
  4. Run the crontab scheduler and job runner together.
  5. Wait for completion and failure batchers to flush on shutdown.
  6. Stop recovery tasks, emit shutdown hooks, and deregister the worker.

The job runner listens for job signals. A signal can come from the PostgreSQL listener, the polling interval, run_once, or the local queue. Each signal is fanned out to the configured concurrency so idle worker tasks can attempt to fetch jobs. Signals are wakeup hints rather than durable work items, so the dispatcher coalesces them when worker tasks are already saturated. This keeps PostgreSQL notification listeners drained while polling still provides the fallback path for missed or future work.

There are two fetch modes:

  • Direct mode fetches one job per worker task with get_job.
  • Local queue mode prefetches jobs with batch_get_jobs and wakes worker tasks through an internal signal when cached jobs are available.

Both modes end in the same execution and release logic.

Running a Task

After a job is fetched, the worker looks up the registered handler by the job's task identifier. If no identifier or function is found, the job is released through the failure path.

For a valid job, the worker creates a WorkerContext containing the shared job, database handle, schema, worker id, extensions, task details, and time mode. It then runs the handler future:

impl TaskHandler for SendEmail {
    const IDENTIFIER: &'static str = "send_email";

    async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        // use ctx and the deserialized payload here
        Ok::<(), String>(())
    }
}

The execution path records task duration for successful jobs, catches panics, and watches the worker shutdown signal. If shutdown is requested, the worker waits for the configured grace period. A task that has not finished by then is treated as aborted and goes through the recovery release path.

Completion and Failure

Successful jobs are completed by deleting the job row. If the job belongs to a queue, completion also clears locked_by and locked_at on the queue row, but only when that queue is locked by the same worker id.

Failed jobs are updated instead of deleted:

  • last_error is set to the persisted error message.
  • run_at is moved forward by exponential backoff based on attempts.
  • locked_by and locked_at are cleared.
  • a replacement payload may be written when the handler returned one.
  • queued jobs also unlock their queue row.

The worker decides whether a failure will retry by comparing attempts with max_attempts. Retryable failures emit job-fail hooks; exhausted jobs emit permanent-failure hooks. The SQL update is the same shape: a permanently failed job remains unlocked with its last error recorded and is no longer available once its attempt limit is reached by the schema availability rules.

Completion and ordinary failures can be batched. The completion and failure batchers receive release requests through bounded channels, flush them after the configured delay, and emit hooks only for rows that were actually persisted. If a batcher is already closed, the worker falls back to direct persistence for that job.

Shutdown and Recovery

Shutdown aborts are intentionally not normal task failures. When a running job is interrupted by the shutdown grace-period timeout, the worker applies job recovery instead of failure backoff:

  • attempts is decremented with a floor of zero.
  • locked_by and locked_at are cleared.
  • run_at is delayed by interrupted_job_retry_delay when configured.
  • queued jobs also unlock their queue row.
  • interruption hooks are emitted when recovery handled the job.

Automatic worker recovery is separate and opt-in. When enabled, workers record heartbeats, and the recovery sweeper finds stale worker ids or orphaned locks. Recovered jobs are returned to the queue through the same recovery semantics: the attempt consumed by fetching the job is reversed, locks are cleared, and a recovery delay can be applied before the job is visible again.

The recovery SQL layer can also fetch locked jobs for a set of worker ids and call the schema's recover_dead_worker_jobs function to recover jobs owned by dead workers.

Workspace Layout

The workspace keeps behavior separated by responsibility:

src/
  lib.rs                 crate exports and top-level module structure
  runner/                worker lifecycle, job loop, execution, release
  batcher/               completion and failure batching
  streams/               job signal and job fetch streams

crates/
  graphile-worker-queries/
    src/add_job/         SQL wrappers for inserting jobs
    src/get_job.rs       single-job locking fetch
    src/batch_get_jobs.rs
                         batched locking fetch for LocalQueue
    src/complete_job.rs  completion persistence
    src/fail_job/        failure persistence
    src/return_jobs/     recovery and interrupted-job return
    src/recover_workers.rs
                         stale-worker recovery queries

This layout keeps most PostgreSQL statements in graphile-worker-queries, while src/runner owns the orchestration decisions: when to fetch, how to execute handlers, which release path to use, and when to flush background components.