Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Introduction

Graphile Worker RS is a PostgreSQL-backed job queue for Rust applications. It lets an application store durable work in PostgreSQL, then process that work in background workers instead of blocking request handling or other latency sensitive paths.

Use it for work such as sending email, generating documents, running calculations, synchronizing data, cleaning up records, or moving jobs produced by PostgreSQL triggers and functions into Rust code. Jobs are stored in PostgreSQL, task handlers run in Rust, and the worker coordinates fetching, execution, retries, queues, and scheduling.

Relationship to Graphile Worker

Graphile Worker RS is based on the Node.js Graphile Worker project. The core model is the same: PostgreSQL is the durable queue, workers claim available jobs, task identifiers choose the handler, and job metadata controls scheduling, priority, attempts, and queue behavior.

This project is not a Node wrapper. It is a Rust rewrite with Rust APIs, workspace crates, feature flags, and async runtime integration. The root crate is graphile_worker, and the workspace contains focused crates for task handlers, job specifications, context, migrations, cron parsing and running, lifecycle hooks, recovery, database access, runtime adapters, the CLI, and admin tooling.

The Rust implementation is mostly compatible with the original Graphile Worker model, so it can be used alongside the Node.js worker where the shared database schema and job contracts match. One important implementation difference is worker identity: the Rust worker uses one worker id per worker instance, and jobs are processed by the enabled async runtime.

What It Solves

Graphile Worker RS is useful when PostgreSQL is already part of your system and you want background processing without operating a separate queue service. It provides:

  • durable job storage in PostgreSQL
  • efficient job claiming with PostgreSQL locking
  • low-latency wakeups through PostgreSQL notifications
  • typed Rust task handlers through TaskHandler
  • scheduling from Rust through WorkerUtils or from SQL through graphile_worker.add_job
  • delayed jobs, priorities, attempts, job keys, and serial queues
  • cron-like recurring jobs through the crontab crates
  • batch job insertion and batch task handling
  • configurable worker concurrency
  • graceful shutdown behavior for in-flight work
  • optional worker recovery for jobs left locked by failed workers
  • lifecycle hooks, tracing integration, CLI commands, and admin UI crates for operational visibility

The default feature set uses Tokio, rustls, and the SQLx PostgreSQL driver. Feature flags also expose async-std, native TLS, the tokio-postgres driver, and OpenTelemetry compatibility groups.

Guide and API Reference

This mdBook is the narrative guide. Read it when you want to understand how the pieces fit together, choose configuration, follow a task-oriented workflow, or prepare a worker for production.

Start with Getting Started to install the crate, define a first TaskHandler, register it with WorkerOptions, and run a worker. Then use Core Concepts for the mental model around tasks, workers, scheduling, queues, and the database schema.

Use Configuration when selecting runtime, TLS, database driver, shutdown, recovery, and application state options. Use Guides for focused workflows such as scheduling jobs, batch jobs, cron jobs, local queue behavior, worker recovery, lifecycle hooks, and job management. Use Operations for the CLI, admin UI, migrations, deployment, and observability topics.

Use docs.rs when you need generated Rust API documentation: exact method signatures, trait bounds, enum variants, struct fields, feature-gated items, and crate-level rustdoc. The API Reference page links to the generated docs for the main crate and supporting crates, while Crate Map explains what each workspace crate is for.

For executable examples, start with examples/simple.rs, then look at examples/crontab.rs, examples/batch_add_jobs.rs, examples/local_queue.rs, and the hook examples. The integration tests under tests/ are also useful when you need precise behavior for scheduling, queues, worker utilities, migrations, recovery, and runtime driver paths.

Getting Started

Graphile Worker RS is a PostgreSQL-backed job queue for Rust applications. Use it when application work should move out of the request path: sending email, generating files, running calculations, scheduling follow-up work, or processing jobs created by PostgreSQL triggers and functions.

This section is an adoption roadmap. It points to the pages that help you move from deciding whether Graphile Worker RS fits your application to running your first worker and exploring the examples.

Adoption Roadmap

  1. Decide whether a PostgreSQL-backed queue is the right model for your work. Start with When to Use Graphile Worker RS.
  2. Add the crate and choose the runtime, TLS, and database driver features your application will use. See Installation.
  3. Run a minimal worker to prove your database connection, migrations, task registration, and job execution flow. Follow Quick Start.
  4. Turn that minimal worker into an application shape you can keep: define task payloads, register handlers, provide a PostgreSQL pool, and decide how the worker should run. See First Worker.
  5. Compare the repository examples with your use case. The Examples Tour is the best next stop once the basic worker is running.

What You Build First

A worker needs three application-level pieces:

  • A serializable payload type.
  • A TaskHandler implementation with a stable IDENTIFIER.
  • WorkerOptions configured with a PostgreSQL pool and registered handlers.

The shape is visible in examples/simple.rs:

use graphile_worker::{IntoTaskHandlerResult, WorkerContext};
use graphile_worker_task_handler::TaskHandler;
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct SayHello {
    message: String,
}

impl TaskHandler for SayHello {
    const IDENTIFIER: &'static str = "say_hello";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        println!("Hello {} !", self.message);
        Ok(())
    }
}

The worker is then configured with a connection pool, concurrency, schema, and registered task:

use graphile_worker::WorkerOptions;

let worker = WorkerOptions::default()
    .concurrency(2)
    .schema("example_simple_worker")
    .define_job::<SayHello>()
    .pg_pool(pg_pool)
    .init()
    .await?;

worker.run().await?;

Adding Work to the Queue

After initialization, create utilities from the worker and add jobs with a payload and JobSpecBuilder. The simple example schedules a job for later:

use chrono::{offset::Utc, Duration};
use graphile_worker::JobSpecBuilder;

let helpers = worker.create_utils();

helpers
    .add_job(
        SayHello {
            message: "world".to_string(),
        },
        JobSpecBuilder::new()
            .run_at(Utc::now() + Duration::seconds(10))
            .build(),
    )
    .await?;

That flow is enough to validate the core model: enqueue typed work, store it in PostgreSQL, and let the worker execute the matching handler.

Installation Choices

The default crate features enable Tokio, Rustls TLS, and the SQLx driver. A basic dependency setup looks like this:

[dependencies]
graphile_worker = "0.13"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

The crate also exposes feature flags for runtime-async-std, tls-native-tls, driver-tokio-postgres, and OpenTelemetry compatibility. Use Installation first, then check Runtime, TLS, and Drivers and Feature Flags when your application needs a non-default combination.

Where to Go Next

When to Use Graphile Worker RS

Graphile Worker RS is a PostgreSQL-backed job queue for Rust applications. Use it when your application already depends on PostgreSQL and you want background work to be stored, coordinated, retried, and scheduled through the same database instead of a separate queue service.

It is based on Graphile Worker and is designed for jobs such as sending emails, performing calculations, generating PDFs, scheduling future work, and running recurring tasks.

Good Fit

Graphile Worker RS is a good fit when you need one or more of these properties:

  • Background jobs that survive process restarts because they are persisted in PostgreSQL.
  • Low-latency job pickup using PostgreSQL LISTEN/NOTIFY.
  • Concurrent job processing coordinated through PostgreSQL SKIP LOCKED.
  • Automatic retries for failed jobs, including exponential backoff.
  • Scheduled jobs that should run later.
  • Cron-like recurring jobs.
  • Type-safe Rust task handlers with serde payloads.
  • A worker that can run inside an existing Rust async application.
  • Optional worker recovery for deployments where jobs may remain locked after a crash, network partition, forced abort, or orchestrator shutdown.

The common case is an application that already writes application data to PostgreSQL and wants to enqueue follow-up work close to that data.

use graphile_worker::{IntoTaskHandlerResult, TaskHandler, WorkerContext};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct SendEmail {
    to: String,
    subject: String,
}

impl TaskHandler for SendEmail {
    const IDENTIFIER: &'static str = "send_email";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        println!("send '{}' to {}", self.subject, self.to);
        Ok::<(), String>(())
    }
}

Typical Use Cases

User-Facing Background Work

Move slow or unreliable work out of HTTP request handling:

  • Send transactional email after signup or payment.
  • Generate a PDF, image, report, or export after the user requests it.
  • Perform expensive calculations after saving the primary database record.
  • Notify another system without blocking the response path.

Scheduled Work

Use scheduled jobs when the job should run at a known time in the future:

// Build the job with its payload and scheduling options, then enqueue it with
// WorkerUtils from your application code.
let job = SendEmail {
    to: "customer@example.com".to_owned(),
    subject: "Your report is ready".to_owned(),
};

Recurring Work

Use cron-like recurring tasks for periodic maintenance or synchronization:

  • Send daily reports.
  • Reconcile records on a schedule.
  • Poll an integration periodically.
  • Clean up old data.

Graphile Worker RS exposes crontab parsing and cron runner types from the main crate, so recurring jobs can live beside the rest of your worker configuration.

Database-Centered Workflows

The crate description explicitly calls out jobs generated by PostgreSQL triggers or functions. This is useful when the database is the source of truth for state transitions and the application needs a Rust worker to process the resulting work queue.

Prerequisites

Before choosing Graphile Worker RS, make sure these assumptions match your project:

  • You can provide a PostgreSQL database connection.
  • Your application can run a Rust async runtime. Tokio is enabled by default, and async-std is available through the runtime-async-std feature.
  • Your job payloads can be serialized and deserialized with serde.
  • You can register every task handler with a stable identifier string.
  • You are comfortable letting the worker manage its own PostgreSQL schema. The default schema name is graphile_worker.
  • You can choose the database driver and TLS features that match your deployment. The default features include Tokio, rustls TLS, and the sqlx driver.

A minimal Tokio setup looks like this:

graphile_worker::WorkerOptions::default()
    .concurrency(5)
    .schema("graphile_worker")
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?
    .run()
    .await?;

When to Use Recovery

Worker recovery is useful for deployments where jobs can be interrupted by events outside the normal graceful shutdown path, such as a process crash, network partition, forced abort, or orchestrator shutdown.

When enabled, workers record heartbeats in PostgreSQL. A sweeper can then find workers that have stopped heartbeating, unlock their jobs, release queue locks, and delay the recovered jobs before retrying them.

use graphile_worker::{WorkerOptions, WorkerRecoveryConfig};
use std::time::Duration;

let recovery = WorkerRecoveryConfig::default()
    .enabled(true)
    .heartbeat_interval(Duration::from_secs(30))
    .sweep_interval(Duration::from_secs(60))
    .sweep_threshold(Duration::from_secs(300))
    .recovery_delay(Duration::from_secs(30));

let worker = WorkerOptions::default()
    .worker_recovery(recovery)
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

Recovery is disabled by default, so enable it deliberately for deployments that need stale worker detection and job lock recovery.

Bad Fit

Graphile Worker RS may not be the right tool when:

  • Your application does not use PostgreSQL and you do not want PostgreSQL to be part of the job system.
  • You need an in-memory-only queue where jobs can be discarded on restart.
  • You need a general message broker that is independent of your database.
  • Your workers are not Rust applications.
  • Your workload cannot be represented as registered task identifiers with serializable payloads.
  • You need to avoid database-managed queue state entirely.

It is also not a replacement for normal request handling. If work must finish before the response can be sent, keep it in the request path. Use Graphile Worker RS when the work can run asynchronously and be retried independently.

Decision Checklist

Choose Graphile Worker RS when most of these are true:

  • PostgreSQL is already a required dependency.
  • The job must survive application restarts.
  • Failed jobs should be retried automatically.
  • Jobs can be described with a task identifier and a serialized payload.
  • Running the worker in a Rust async process fits your deployment.
  • Keeping queue state near application data is simpler than operating another queue service.

Installation

Graphile Worker RS is published as the graphile_worker crate. It needs a PostgreSQL database and one supported async runtime in your application.

Add the Crate

For the default setup, add the main crate:

cargo add graphile_worker

Or add it to Cargo.toml directly:

[dependencies]
graphile_worker = "0.13"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

The default feature set enables:

  • runtime-tokio
  • tls-rustls
  • driver-sqlx

This is the recommended starting point for Tokio applications using SQLx and rustls TLS.

Tokio

Tokio is enabled by default, so most applications only need to add Tokio with the runtime features they use:

[dependencies]
graphile_worker = "0.13"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

A typical Tokio entrypoint looks like this:

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    run_worker().await
}

async-std

To use async-std, disable default features and enable runtime-async-std. Because the default SQLx driver and rustls TLS are also disabled when default features are disabled, enable them explicitly if you still want that setup:

[dependencies]
graphile_worker = { version = "0.13", default-features = false, features = [
  "runtime-async-std",
  "tls-rustls",
  "driver-sqlx",
] }
async-std = { version = "1", features = ["attributes"] }

Applications using the #[async_std::main] macro need async-std's attributes feature:

#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    run_worker().await
}

Driver and TLS Features

The SQLx PostgreSQL driver is enabled by default through driver-sqlx. The crate also exposes driver-tokio-postgres for Tokio applications.

[dependencies]
graphile_worker = { version = "0.13", default-features = false, features = [
  "runtime-tokio",
  "driver-tokio-postgres",
] }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

The driver-tokio-postgres feature enables runtime-tokio and is tested only with the Tokio runtime. Use driver-sqlx for async-std.

TLS is selected separately from the database driver. The available TLS feature flags are:

  • tls-rustls
  • tls-native-tls

When you disable default features, choose the runtime, database driver, and TLS features you need explicitly.

DATABASE_URL

Graphile Worker RS connects to PostgreSQL. Local tests and examples in this repository use a standard PostgreSQL connection string through DATABASE_URL:

export DATABASE_URL='postgres://postgres:postgres@localhost:54233/postgres'

Use the same URL form for your own database, changing the username, password, host, port, and database name as needed.

For a worker setup example that creates a SQLx pool and registers jobs, continue to Quick Start. For a complete feature list, see Feature Flags.

Quick Start

This page shows the smallest useful Graphile Worker RS setup: define one typed task, register it with a worker, add a job, and start processing jobs.

Add Dependencies

Graphile Worker RS runs against PostgreSQL. With the default Tokio runtime, rustls TLS, and SQLx driver, a minimal application can connect with a database URL and these dependencies:

[dependencies]
graphile_worker = "0.13"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
serde = { version = "1", features = ["derive"] }

Define a Typed Task

A task is a Rust type that can be serialized into the job payload and deserialized when the worker runs it. Implement TaskHandler for the payload type and give it a stable identifier.

use graphile_worker::{IntoTaskHandlerResult, TaskHandler, WorkerContext};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct SendEmail {
    to: String,
    subject: String,
    body: String,
}

impl TaskHandler for SendEmail {
    const IDENTIFIER: &'static str = "send_email";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        println!("Sending '{}' to {}", self.subject, self.to);
        Ok::<(), String>(())
    }
}

The identifier is the task name stored in PostgreSQL. Jobs with this identifier will be decoded as SendEmail and passed to SendEmail::run.

Create and Run a Worker

Configure a PostgreSQL connection string, register the task with define_job, initialize the worker, and call run.

use graphile_worker::WorkerOptions;

async fn run_worker() -> Result<(), Box<dyn std::error::Error>> {
    let worker = WorkerOptions::default()
        .database_url("postgres://postgres:password@localhost/mydb")
        .concurrency(5)
        .schema("graphile_worker")
        .define_job::<SendEmail>()
        .init()
        .await?;

    worker.run().await?;

    Ok(())
}

init connects to PostgreSQL, runs the worker migrations for the configured schema, registers the task handlers, and returns a Worker. The schema defaults to graphile_worker if you do not set one.

Add a Job

After initialization, use create_utils to get WorkerUtils. Its add_job method accepts the typed task payload and a JobSpec.

use graphile_worker::JobSpecBuilder;

let helpers = worker.create_utils();

helpers
    .add_job(
        SendEmail {
            to: "ada@example.com".to_string(),
            subject: "Welcome".to_string(),
            body: "Thanks for signing up.".to_string(),
        },
        JobSpecBuilder::new().build(),
    )
    .await?;

For scheduled jobs, set run_at on the job spec before building it. This example uses chrono; add it to your application if you schedule from Rust timestamps:

use chrono::{Duration, offset::Utc};
use graphile_worker::JobSpecBuilder;

helpers
    .add_job(
        SendEmail {
            to: "ada@example.com".to_string(),
            subject: "Welcome".to_string(),
            body: "Thanks for signing up.".to_string(),
        },
        JobSpecBuilder::new()
            .run_at(Utc::now() + Duration::seconds(10))
            .build(),
    )
    .await?;

Complete Shape

This combines the pieces into one application shape. In a real application you can add jobs from request handlers, services, or startup code while one or more workers process them.

use graphile_worker::{
    IntoTaskHandlerResult, JobSpecBuilder, TaskHandler, WorkerContext, WorkerOptions,
};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct SendEmail {
    to: String,
    subject: String,
    body: String,
}

impl TaskHandler for SendEmail {
    const IDENTIFIER: &'static str = "send_email";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        println!("Sending '{}' to {}", self.subject, self.to);
        Ok::<(), String>(())
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let worker = WorkerOptions::default()
        .database_url("postgres://postgres:password@localhost/mydb")
        .concurrency(5)
        .schema("graphile_worker")
        .define_job::<SendEmail>()
        .init()
        .await?;

    worker
        .create_utils()
        .add_job(
            SendEmail {
                to: "ada@example.com".to_string(),
                subject: "Welcome".to_string(),
                body: "Thanks for signing up.".to_string(),
            },
            JobSpecBuilder::new().build(),
        )
        .await?;

    worker.run().await?;

    Ok(())
}

For a runnable example in the repository, see examples/simple.rs. For more configuration options, continue with Configuration.

First Worker

This page walks through the smallest useful Graphile Worker RS flow:

  1. Create a PostgreSQL pool.
  2. Define a task handler.
  3. Configure WorkerOptions.
  4. Initialize the worker.
  5. Add a job.
  6. Run the worker.
  7. Let shutdown happen gracefully.

The examples use Tokio and SQLx, matching the default setup used by the repository examples.

Define a Handler

A worker processes jobs by matching each job's task identifier to a registered TaskHandler. The handler type is also the payload type, so it should derive Serialize and Deserialize.

use graphile_worker::{IntoTaskHandlerResult, TaskHandler, WorkerContext};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct SayHello {
    message: String,
}

impl TaskHandler for SayHello {
    const IDENTIFIER: &'static str = "say_hello";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        println!("Hello {} !", self.message);
        Ok::<(), String>(())
    }
}

IDENTIFIER is the task name stored in PostgreSQL. Register the same handler type on any worker that should be able to process jobs with that identifier.

Create the Pool

You can pass an existing SQLx PostgreSQL pool with pg_pool. This is useful when your application already owns database configuration.

use std::str::FromStr;

use sqlx::postgres::{PgConnectOptions, PgPoolOptions};

let pg_options = PgConnectOptions::from_str(
    "postgres://postgres:root@localhost:5432",
)?;

let pg_pool = PgPoolOptions::new()
    .max_connections(5)
    .connect_with(pg_options)
    .await?;

You can also let Graphile Worker create the pool from a URL with database_url. When using database_url, max_pg_conn controls the pool size and defaults to 20 if it is not set.

let options = graphile_worker::WorkerOptions::default()
    .database_url("postgres://postgres:root@localhost:5432")
    .max_pg_conn(5);

If both an existing database connection and database_url are provided, the existing connection takes precedence.

Configure and Initialize

WorkerOptions is a builder. The usual first-worker options are:

  • concurrency: maximum jobs processed at the same time. If omitted, it defaults to the number of logical CPUs.
  • schema: PostgreSQL schema used for Graphile Worker tables. If omitted, it defaults to graphile_worker.
  • define_job::<T>(): registers a TaskHandler type.
  • pg_pool, database, or database_url: provides the database connection.

Calling init().await connects to the database if needed, runs migrations for the selected schema, registers task details, creates the worker id, and returns a ready Worker.

use graphile_worker::WorkerOptions;

let worker = WorkerOptions::default()
    .concurrency(2)
    .schema("example_simple_worker")
    .define_job::<SayHello>()
    .pg_pool(pg_pool)
    .init()
    .await?;

Add a Job

After initialization, create utilities from the worker and enqueue a job. JobSpec::default() schedules it with default job options.

use graphile_worker::JobSpec;

let helpers = worker.create_utils();

helpers
    .add_job(
        SayHello {
            message: "world".to_string(),
        },
        JobSpec::default(),
    )
    .await?;

To schedule a job for later, build a JobSpec with JobSpecBuilder.

use chrono::{Duration, Utc};
use graphile_worker::JobSpecBuilder;

helpers
    .add_job(
        SayHello {
            message: "world".to_string(),
        },
        JobSpecBuilder::new()
            .run_at(Utc::now() + Duration::seconds(10))
            .build(),
    )
    .await?;

Run the Worker

run() starts the long-running worker loop. It registers the worker, starts job sources, processes jobs until shutdown is requested, waits for batchers to finish, emits shutdown hooks, and deregisters the worker.

worker.run().await?;

For scripts and smoke tests, run_once() processes currently available jobs and then returns.

worker.run_once().await?;

Full Example

use std::str::FromStr;

use graphile_worker::{
    IntoTaskHandlerResult, JobSpec, TaskHandler, WorkerContext, WorkerOptions,
};
use serde::{Deserialize, Serialize};
use sqlx::postgres::{PgConnectOptions, PgPoolOptions};

#[derive(Deserialize, Serialize)]
struct SayHello {
    message: String,
}

impl TaskHandler for SayHello {
    const IDENTIFIER: &'static str = "say_hello";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        println!("Hello {} !", self.message);
        Ok::<(), String>(())
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let pg_options = PgConnectOptions::from_str(
        "postgres://postgres:root@localhost:5432",
    )?;

    let pg_pool = PgPoolOptions::new()
        .max_connections(5)
        .connect_with(pg_options)
        .await?;

    let worker = WorkerOptions::default()
        .concurrency(2)
        .schema("example_simple_worker")
        .define_job::<SayHello>()
        .pg_pool(pg_pool)
        .init()
        .await?;

    let helpers = worker.create_utils();

    helpers
        .add_job(
            SayHello {
                message: "world".to_string(),
            },
            JobSpec::default(),
        )
        .await?;

    worker.run().await?;

    Ok(())
}

Shutdown

By default, the worker listens for OS shutdown signals such as Ctrl+C and SIGTERM. When a shutdown signal arrives, in-flight jobs are given a grace period to finish. The default grace period is 5 seconds, and shutdown-aborted jobs are retried after a default delay of 30 seconds.

If the host application already owns shutdown handling, pass a custom shutdown future and disable OS signal listeners:

use graphile_worker::WorkerShutdownConfig;
use std::time::Duration;

let shutdown = WorkerShutdownConfig::default()
    .listen_os_shutdown_signals(false)
    .grace_period(Duration::from_secs(10))
    .interrupted_job_retry_delay(Duration::from_secs(30))
    .shutdown_signal(async {
        // Complete this future when your application wants the worker to stop.
    });

let worker = WorkerOptions::default()
    .worker_shutdown(shutdown)
    .define_job::<SayHello>()
    .pg_pool(pg_pool)
    .init()
    .await?;

worker.run().await?;

The same settings can also be configured directly on WorkerOptions with listen_os_shutdown_signals, shutdown_signal, shutdown_grace_period, and shutdown_interrupted_job_retry_delay.

Next, see Scheduling Jobs for job timing options and Worker Options for more configuration details.

Examples Tour

The repository includes runnable examples that show how the worker APIs fit together in real programs. This page tours each file under examples/ and calls out what to learn from it.

Most examples connect to PostgreSQL, initialize a worker, add one or more jobs, and then run the worker. Several examples fall back to postgres://postgres:root@localhost:5432; others read DATABASE_URL first.

Basic worker flow

examples/simple.rs

Start here if you want the smallest complete worker shape:

  1. Define a serializable task payload.
  2. Implement TaskHandler with an IDENTIFIER.
  3. Build WorkerOptions, register the task, attach a PostgreSQL pool, and call init().
  4. Use worker.create_utils() to enqueue work.
  5. Call worker.run() to process jobs continuously.

The task intentionally fails most of the time so you can observe retry behavior:

impl TaskHandler for SayHello {
    const IDENTIFIER: &'static str = "say_hello";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        println!("Hello {} !", self.message);

        if rand::rng().random_range(0..100) < 70 {
            return Err("Failed".to_string());
        }

        Ok(())
    }
}

It also schedules its first job for 10 seconds in the future with JobSpecBuilder::run_at.

examples/run_once.rs

Use this example when you want a bounded worker pass instead of a long-running process. It schedules 10 SayHello jobs, then calls:

worker.run_once().await.unwrap();

This is useful for learning the difference between continuous workers and a single processing pass.

Sharing application state

examples/app_state.rs

This example shows how to attach application-owned state to the worker and read it from task handlers.

AppState wraps an Arc<AtomicUsize>, is installed with WorkerOptions::add_extension, and is retrieved from the task context:

let app_state = ctx.get_ext::<AppState>().unwrap();
let run_count = app_state.increment_run_count();
println!("Run count: {run_count}");

The file also combines normal enqueued jobs with a cron definition using Cron::every_minute::<ShowRunCount>().fill(CrontabFill::hours(1)), so it is a compact example of state plus recurring jobs.

Scheduling work from a handler

examples/context_helpers.rs

This example demonstrates WorkerContextExt helper methods inside a running task. SendWs represents an initial operation, then schedules a CheckWs follow-up job from inside run:

ctx.add_job(
    CheckWs {
        request_id: self.request_id.clone(),
    },
    JobSpecBuilder::new()
        .run_at(Utc::now() + Duration::seconds(10))
        .build(),
)
.await?;

Use this pattern for workflows where one job needs to create the next step after it succeeds.

Adding jobs in batches

examples/batch_add_jobs.rs

This file compares two batch insertion APIs:

  • add_jobs::<SendEmail>(&emails) for many jobs with the same typed payload.
  • add_raw_jobs(&mixed_jobs) for heterogeneous jobs where each item provides an identifier, JSON payload, and JobSpec.

The raw batch includes both send_email and process_payment jobs, and uses JobSpecBuilder::priority(-10) for one urgent payment job:

let mixed_jobs = vec![
    RawJobSpec {
        identifier: "send_email".into(),
        payload: json!({ "to": "dave@example.com", "subject": "Notification" }),
        spec: JobSpec::default(),
    },
    RawJobSpec {
        identifier: "process_payment".into(),
        payload: json!({ "user_id": 123, "amount": 50 }),
        spec: JobSpecBuilder::new().priority(-10).build(),
    },
];

Use typed batches when all payloads share one handler type. Use raw batches when you need one database insert for multiple task identifiers.

Cron jobs

examples/crontab.rs

This example focuses on recurring jobs. It registers two task types and defines a cron entry for SayHello:

Cron::every_n_minutes::<SayHello>(2)
    .unwrap()
    .fill(CrontabFill::minutes(10))
    .job_key("say_hello_dedupe")
    .job_key_mode(CronJobKeyMode::PreserveRunAt)
    .payload(SayHello {
        message: "Crontab".to_string(),
    })
    .unwrap()

The example shows how to set:

  • the interval with every_n_minutes;
  • backfill with CrontabFill;
  • a job_key for deduplication;
  • CronJobKeyMode::PreserveRunAt;
  • a typed payload for the recurring job.

See also Cron Jobs for a broader explanation of recurring work.

Plugins and hooks

examples/hooks.rs

This is the entry point for the hooks example. It wires together:

  • ProcessData, the task being processed;
  • MetricsPlugin, which observes worker and job lifecycle events;
  • ValidationPlugin, which can continue, skip, or fail a job before it runs;
  • logging::enable_logs, a small tracing setup.

It enqueues four jobs so you can see each path: normal processing, skip, forced failure, and another normal job.

examples/hooks/process_data.rs

This file defines the payload used by the hooks example:

pub(super) struct ProcessData {
    pub(super) value: i32,
    #[serde(default)]
    pub(super) skip: bool,
    #[serde(default)]
    pub(super) force_fail: bool,
}

The handler itself only prints the value. The interesting behavior comes from plugins inspecting skip and force_fail.

examples/hooks/validation.rs

ValidationPlugin registers a BeforeJobRun hook. It reads the raw JSON payload from the hook context:

  • when skip is true, it returns HookResult::Skip;
  • when force_fail is true, it returns HookResult::Fail(...);
  • otherwise it returns HookResult::Continue.

This is the file to study when a plugin needs to make a pre-run decision.

examples/hooks/metrics.rs

MetricsPlugin registers lifecycle hooks for:

  • WorkerStart;
  • WorkerShutdown;
  • JobStart;
  • JobComplete;
  • JobFail.

It keeps atomic counters for started, completed, and failed jobs, then prints final counts during shutdown.

examples/hooks/logging.rs

This helper installs a tracing_subscriber registry with a debug-level filter and sqlx=warn. It is intentionally small so the hook examples can focus on plugin behavior.

Local queue behavior

examples/local_queue.rs

This example shows how to enable and observe local queue batching. It configures the worker with:

.local_queue(
    LocalQueueConfig::default()
        .with_size(50)
        .with_ttl(Duration::from_secs(60))
        .with_refetch_delay(
            RefetchDelayConfig::default()
                .with_duration(Duration::from_millis(100))
                .with_threshold(5)
                .with_max_abort_threshold(20),
        ),
)

The local plugin logs queue-specific hooks such as initialization, mode changes, jobs fetched, jobs returned, refetch delay start, delay abort, and delay expiry. The example enqueues 20 short ProcessItem jobs so you can watch batch fetching and refetch-delay behavior in logs.

Worker utilities in a web server

The sendable_worker example is split across several files. Together they show that a worker can run beside an HTTP server while WorkerUtils is shared with request handlers.

examples/sendable_worker.rs

This is the example entry point. It:

  • builds a worker with Worker::options();
  • registers ExampleTask and DatabaseTask;
  • wraps worker.create_utils() in Arc;
  • binds a TCP listener on 127.0.0.1:3000;
  • spawns the worker with tokio::spawn;
  • spawns a simple HTTP accept loop;
  • starts a second worker against the same database URL.

The application waits with tokio::select! for a worker task, server task, secondary worker task, or Ctrl+C.

examples/sendable_worker/tasks.rs

This file defines the two task handlers used by the HTTP example:

  • ExampleTask logs a name and value, sleeps for 100 milliseconds, and succeeds.
  • DatabaseTask uses ctx.pg_pool() to run SELECT COUNT(*) FROM graphile_worker.jobs and logs the current job count.

Study this file when a task needs access to the worker's PostgreSQL pool.

examples/sendable_worker/http.rs

This file contains the request dispatcher. It reads the HTTP request line, extracts the method and path, and routes:

  • GET / and GET /health;
  • POST /schedule/example?...;
  • POST /schedule/database?...;
  • everything else to 404 Not Found.

It is deliberately minimal HTTP parsing, enough to demonstrate sharing WorkerUtils with request handlers.

examples/sendable_worker/http/schedule.rs

This file schedules jobs from HTTP routes.

schedule_example_task parses name and value, then calls utils.add_job(ExampleTask { name, value }, Default::default()).

schedule_database_task requires a query parameter and uses a richer JobSpecBuilder:

let job_spec = JobSpecBuilder::new()
    .priority(-10)
    .run_at(chrono::Utc::now() + chrono::Duration::seconds(10))
    .job_key(format!("db_task_{}", chrono::Utc::now().timestamp()))
    .build();

This demonstrates scheduling from request input with priority, delay, and job key options.

examples/sendable_worker/http/query.rs

This helper parses query parameters into a HashMap<String, String> and applies a small percent-decoding function for the encoded characters used by the example routes.

examples/sendable_worker/http/response.rs

This helper writes a plain-text HTTP response to a TcpStream, including status line, content length, content type, and body.

Core Concepts

Graphile Worker RS is a PostgreSQL-backed job queue for Rust applications. The core mental model is:

  1. Your application describes work as typed tasks.
  2. It adds jobs for those tasks into PostgreSQL.
  3. A worker fetches runnable jobs, executes the matching task handler, and records the result back in PostgreSQL.

The database is the coordination point. Jobs, task metadata, queue state, worker state, retry metadata, and scheduling data live in the Graphile Worker schema, which defaults to graphile_worker.

The Main Pieces

Tasks

A task is the Rust type that defines what a job does. It implements TaskHandler, provides a stable task identifier, accepts a serializable payload, and contains the async run method for the work.

use graphile_worker::{IntoTaskHandlerResult, TaskHandler, WorkerContext};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct SendEmail {
    to: String,
    subject: String,
}

impl TaskHandler for SendEmail {
    const IDENTIFIER: &'static str = "send_email";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        println!("Sending email to {}", self.to);
        Ok::<(), String>(())
    }
}

Register task handlers on WorkerOptions before starting the worker:

let worker = graphile_worker::WorkerOptions::default()
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

For more detail, see Tasks and Payloads.

Jobs

A job is a stored request to run a task. It includes the task identifier, payload, state, and execution metadata. Job specifications can also carry options such as priority, retry behavior, scheduling information, and queue selection.

Jobs are stored in PostgreSQL, so application processes can enqueue work and worker processes can execute it independently. WorkerUtils provides utility operations for managing jobs, such as adding, removing, and rescheduling them.

Workers

A worker is the runtime process that polls PostgreSQL, locks runnable jobs, and executes registered handlers with the configured concurrency. Graphile Worker RS uses PostgreSQL features such as SKIP LOCKED for efficient job fetching and LISTEN/NOTIFY for low-latency wakeups.

graphile_worker::WorkerOptions::default()
    .concurrency(5)
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?
    .run()
    .await?;

Workers also own lifecycle concerns such as graceful shutdown. Optional recovery settings can record worker heartbeats and recover jobs locked by workers that no longer heartbeat.

For more detail, see Worker Lifecycle and Architecture.

Scheduling

Scheduling determines when a job becomes runnable. A job can be available immediately or scheduled for a later time. Graphile Worker RS also exposes cron support through Cron and CronBuilder for recurring work.

Use scheduling when the time a job should run matters more than when it was created, such as sending a reminder later or running a daily report.

For more detail, see Scheduling Model.

Queues and Concurrency

Concurrency controls how many jobs a worker may process at once. Queues add coordination around groups of jobs. They are useful when some work must be serialized or separated from other work.

The database schema includes _private_job_queues, which tracks queue names for serialized job execution. Worker configuration controls the amount of parallel work a process can take on.

For more detail, see Queues and Concurrency.

Database Schema

Graphile Worker RS manages its own PostgreSQL schema and migrations. The default schema name is graphile_worker, and the crate documentation identifies these core tables:

  • _private_jobs stores job data, state, and execution metadata.
  • _private_tasks tracks registered task types.
  • _private_job_queues manages queue names for serialized job execution.
  • _private_workers tracks active worker instances.

For more detail, see Database Schema.

How The Pieces Fit Together

In a typical application:

  1. Define one Rust type per task and implement TaskHandler.
  2. Configure a WorkerOptions value with a PostgreSQL pool and task definitions.
  3. Initialize the worker so migrations and setup can run.
  4. Add jobs from application code or utilities.
  5. Run one or more worker processes to execute runnable jobs.

The important boundary is between tasks and jobs. A task is code compiled into your application. A job is data stored in PostgreSQL that says which task should run, with which payload, and under which execution rules.

Where To Go Next

Start with Architecture for the system-level view, then read Tasks and Payloads and Worker Lifecycle to understand the main programming model. After that, use Scheduling Model, Queues and Concurrency, and Database Schema for the specific behavior you need to configure or operate.

Architecture

Graphile Worker RS is a PostgreSQL-backed job queue. Application code writes jobs into the Graphile Worker schema, worker processes lock ready jobs with PostgreSQL row locks, task handlers run in the configured async runtime, and the worker persists completion, retry, failure, or recovery state back to the database.

The public crate is split into a small set of user-facing concepts:

  • WorkerOptions configures a worker, registers task handlers, and creates a Worker.
  • Worker runs the job loop until shutdown.
  • WorkerUtils adds and manages jobs outside the worker loop.
  • TaskHandler defines the Rust code that handles a job payload.
  • JobSpec configures scheduling, queues, attempts, priority, flags, and job keys for inserted jobs.

Internally, the root crate coordinates smaller workspace crates. The query crate owns SQL construction, the job/task/spec crates define shared data structures, the lifecycle hook crate exposes extension points, and the runtime facade keeps the worker logic independent of a specific async runtime feature.

Data Flow

The normal path from enqueue to release is:

  1. Application code adds a job through WorkerUtils or the lower-level add_job SQL wrapper.
  2. PostgreSQL stores the job in the worker schema and emits the database notification used by listeners.
  3. A running worker receives a signal from LISTEN jobs:insert, periodic polling, run_once, or an internal local-queue signal.
  4. Worker tasks fetch ready jobs with for update skip locked, filtered to the task identifiers registered in this worker.
  5. The worker builds a WorkerContext, runs the matching TaskHandler, and catches task errors and panics.
  6. The release path deletes completed jobs, reschedules retryable failures, or returns interrupted jobs for recovery.
application
  |
  | add_job(identifier, payload, JobSpec)
  v
PostgreSQL worker schema
  |
  | LISTEN/NOTIFY or polling
  v
Worker job loop
  |
  | get_job / batch_get_jobs
  v
TaskHandler::run(WorkerContext)
  |
  +-- success ----------> complete_job: delete job, unlock queue
  |
  +-- task failure -----> fail_job: save error, unlock, retry later
  |
  +-- shutdown abort ---> recovery return: decrement attempt, unlock, delay

Adding Jobs

The query layer inserts jobs by calling the schema's add_job database function with the task identifier, JSON payload, and JobSpec fields:

let job = worker
    .create_utils()
    .add_raw_job(
        "send_email",
        serde_json::json!({ "to": "a@example.com" }),
        graphile_worker::JobSpec::default(),
    )
    .await?;

The SQL wrapper passes these options to PostgreSQL:

  • identifier: the task name registered by a handler.
  • payload: the JSON payload stored with the job.
  • queue_name: optional queue serialization key.
  • run_at: optional scheduled time.
  • max_attempts: optional retry limit.
  • job_key and job_key_mode: optional job de-duplication/update behavior.
  • priority: lower values are fetched first.
  • flags: labels that workers may skip through forbidden_flags.

When tracing context is active, the insert path can add trace information into the payload before writing it. If use_local_time is enabled and no run_at is supplied, the Rust process time is used as the job's run time; otherwise the SQL layer uses database time.

Fetching Jobs

Workers do not scan every job type. During initialization they know the task identifiers they can run, and fetch only jobs whose task_id is in that set. The fetch query requires:

  • is_available = true
  • run_at <= now(), or the supplied local timestamp
  • no forbidden flags selected by this worker
  • an available queue when the job belongs to a named queue

Jobs are ordered by priority asc, run_at asc, then locked with:

for update
skip locked

After selecting a row, the same query increments attempts, sets locked_by to the worker id, and records locked_at. For queued jobs, the related job_queues row is also locked by the worker. This is the core coordination mechanism that lets multiple worker processes share the same PostgreSQL queue without taking the same job.

Worker Loop

Worker::run() performs the runtime lifecycle in this order:

  1. Register the worker when recovery support is enabled.
  2. Emit worker start hooks.
  3. Spawn recovery background tasks.
  4. Run the crontab scheduler and job runner together.
  5. Wait for completion and failure batchers to flush on shutdown.
  6. Stop recovery tasks, emit shutdown hooks, and deregister the worker.

The job runner listens for job signals. A signal can come from the PostgreSQL listener, the polling interval, run_once, or the local queue. Each signal is fanned out to the configured concurrency so idle worker tasks can attempt to fetch jobs. Signals are wakeup hints rather than durable work items, so the dispatcher coalesces them when worker tasks are already saturated. This keeps PostgreSQL notification listeners drained while polling still provides the fallback path for missed or future work.

There are two fetch modes:

  • Direct mode fetches one job per worker task with get_job.
  • Local queue mode prefetches jobs with batch_get_jobs and wakes worker tasks through an internal signal when cached jobs are available.

Both modes end in the same execution and release logic.

Running a Task

After a job is fetched, the worker looks up the registered handler by the job's task identifier. If no identifier or function is found, the job is released through the failure path.

For a valid job, the worker creates a WorkerContext containing the shared job, database handle, schema, worker id, extensions, task details, and time mode. It then runs the handler future:

impl TaskHandler for SendEmail {
    const IDENTIFIER: &'static str = "send_email";

    async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        // use ctx and the deserialized payload here
        Ok::<(), String>(())
    }
}

The execution path records task duration for successful jobs, catches panics, and watches the worker shutdown signal. If shutdown is requested, the worker waits for the configured grace period. A task that has not finished by then is treated as aborted and goes through the recovery release path.

Completion and Failure

Successful jobs are completed by deleting the job row. If the job belongs to a queue, completion also clears locked_by and locked_at on the queue row, but only when that queue is locked by the same worker id.

Failed jobs are updated instead of deleted:

  • last_error is set to the persisted error message.
  • run_at is moved forward by exponential backoff based on attempts.
  • locked_by and locked_at are cleared.
  • a replacement payload may be written when the handler returned one.
  • queued jobs also unlock their queue row.

The worker decides whether a failure will retry by comparing attempts with max_attempts. Retryable failures emit job-fail hooks; exhausted jobs emit permanent-failure hooks. The SQL update is the same shape: a permanently failed job remains unlocked with its last error recorded and is no longer available once its attempt limit is reached by the schema availability rules.

Completion and ordinary failures can be batched. The completion and failure batchers receive release requests through bounded channels, flush them after the configured delay, and emit hooks only for rows that were actually persisted. If a batcher is already closed, the worker falls back to direct persistence for that job.

Shutdown and Recovery

Shutdown aborts are intentionally not normal task failures. When a running job is interrupted by the shutdown grace-period timeout, the worker applies job recovery instead of failure backoff:

  • attempts is decremented with a floor of zero.
  • locked_by and locked_at are cleared.
  • run_at is delayed by interrupted_job_retry_delay when configured.
  • queued jobs also unlock their queue row.
  • interruption hooks are emitted when recovery handled the job.

Automatic worker recovery is separate and opt-in. When enabled, workers record heartbeats, and the recovery sweeper finds stale worker ids or orphaned locks. Recovered jobs are returned to the queue through the same recovery semantics: the attempt consumed by fetching the job is reversed, locks are cleared, and a recovery delay can be applied before the job is visible again.

The recovery SQL layer can also fetch locked jobs for a set of worker ids and call the schema's recover_dead_worker_jobs function to recover jobs owned by dead workers.

Workspace Layout

The workspace keeps behavior separated by responsibility:

src/
  lib.rs                 crate exports and top-level module structure
  runner/                worker lifecycle, job loop, execution, release
  batcher/               completion and failure batching
  streams/               job signal and job fetch streams

crates/
  graphile-worker-queries/
    src/add_job/         SQL wrappers for inserting jobs
    src/get_job.rs       single-job locking fetch
    src/batch_get_jobs.rs
                         batched locking fetch for LocalQueue
    src/complete_job.rs  completion persistence
    src/fail_job/        failure persistence
    src/return_jobs/     recovery and interrupted-job return
    src/recover_workers.rs
                         stale-worker recovery queries

This layout keeps most PostgreSQL statements in graphile-worker-queries, while src/runner owns the orchestration decisions: when to fetch, how to execute handlers, which release path to use, and when to flush background components.

Tasks and Payloads

Tasks are the typed boundary between queued JSON jobs and your Rust code. A task is a Rust payload type that implements TaskHandler; Graphile Worker RS uses the task identifier stored with the job to select the handler, deserializes the job payload into that type, and then calls run.

TaskHandler

A normal task handler needs four pieces:

  • a payload type that implements Serialize and Deserialize
  • a globally unique IDENTIFIER
  • an async run method
  • a registered job definition on the worker
use graphile_worker::{IntoTaskHandlerResult, TaskHandler, WorkerContext};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct SendEmail {
    to: String,
    subject: String,
}

impl TaskHandler for SendEmail {
    const IDENTIFIER: &'static str = "send_email";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        println!("Sending email to {} with subject: {}", self.to, self.subject);
        Ok::<(), String>(())
    }
}

run receives self as the deserialized payload. Returning () marks the job as complete. Returning Result<(), E> marks Ok(()) as complete and Err(error) as failed, using the error's debug representation.

Register the handler before running the worker:

let worker = WorkerOptions::default()
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

For reusable modules, expose definitions instead of making callers know every type:

use graphile_worker::{JobDefinition, TaskHandler};

pub fn jobs() -> [JobDefinition; 2] {
    [SendEmail::definition(), SendDailyReport::definition()]
}

Identifiers

IDENTIFIER is the string stored with the job. It must be unique across all task types registered by your application, because the worker uses it to match queued jobs to handlers.

Prefer stable, descriptive, lowercase names:

impl TaskHandler for ProcessPayment {
    const IDENTIFIER: &'static str = "process_payment";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        Ok::<(), String>(())
    }
}

When adding typed jobs, the identifier comes from the task type:

helpers
    .add_job(
        SendEmail {
            to: "alice@example.com".into(),
            subject: "Welcome!".into(),
        },
        JobSpecBuilder::new().build(),
    )
    .await?;

When adding raw jobs, you provide the identifier and JSON payload yourself. This is useful for heterogeneous batches, but it skips the compile-time connection between payload type and handler:

utils
    .add_raw_jobs(&[
        RawJobSpec {
            identifier: "send_email".into(),
            payload: serde_json::json!({
                "to": "dave@example.com",
                "subject": "Notification"
            }),
            spec: JobSpec::default(),
        },
        RawJobSpec {
            identifier: "process_payment".into(),
            payload: serde_json::json!({ "user_id": 123, "amount": 50 }),
            spec: JobSpec::default(),
        },
    ])
    .await?;

Payload Serialization

Task payloads are serialized to JSON when jobs are added and deserialized from JSON when jobs run. The implementing type is the payload shape:

#[derive(Deserialize, Serialize)]
struct SayHello {
    message: String,
}

If deserialization fails, the task is treated as failed and the deserialization error is reported as the failure reason. Keep payload structs explicit and avoid depending on data that is not in the job payload unless it comes from WorkerContext.

Batch insertion can still be type-safe when every job has the same task type:

let spec = JobSpec::default();
let emails = vec![
    (
        SendEmail {
            to: "alice@example.com".into(),
            subject: "Welcome!".into(),
        },
        &spec,
    ),
    (
        SendEmail {
            to: "bob@example.com".into(),
            subject: "Welcome!".into(),
        },
        &spec,
    ),
];

utils.add_jobs::<SendEmail>(&emails).await?;

Use job specs for scheduling, priority, and other queue behavior. See Scheduling and Queues for the surrounding job controls.

Batch Handlers

BatchTaskHandler is for one database job whose payload is a JSON array of item payloads. Self is the item type, not Vec<Self>.

use graphile_worker::{
    BatchTaskHandler, IntoBatchTaskHandlerResult, JobSpecBuilder, WorkerContext,
};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct PendingNotification {
    user_id: String,
    message_id: String,
}

impl BatchTaskHandler for PendingNotification {
    const IDENTIFIER: &'static str = "send_notifications";

    async fn run_batch(
        items: Vec<Self>,
        _ctx: WorkerContext,
    ) -> impl IntoBatchTaskHandlerResult {
        let mut results = Vec::with_capacity(items.len());

        for item in items {
            results.push(
                send_notification(item)
                    .await
                    .map_err(|error| error.to_string()),
            );
        }

        results
    }
}

Register batch handlers with define_batch_job and add batch jobs with add_batch_job:

let worker = WorkerOptions::default()
    .define_batch_job::<PendingNotification>()
    .pg_pool(pg_pool)
    .init()
    .await?;

worker
    .create_utils()
    .add_batch_job(
        vec![
            PendingNotification {
                user_id: "1".into(),
                message_id: "a".into(),
            },
            PendingNotification {
                user_id: "1".into(),
                message_id: "b".into(),
            },
        ],
        JobSpecBuilder::new().build(),
    )
    .await?;

A batch handler can return:

  • () for complete success
  • Result<(), E> to complete or fail the whole batch
  • Vec<Result<(), E>> for per-item results
  • BatchTaskResult<E> directly

For per-item results, the vector must have the same length and order as the input items. Failed positions are retried with their original payload values; successful positions are removed from the retry payload. If the stored payload is not a JSON array, or the result count does not match the item count, the batch job fails.

WorkerContext

Both TaskHandler::run and BatchTaskHandler::run_batch receive a WorkerContext. Use the payload fields on self for job input, and use the context for worker-provided data such as job metadata and registered extensions.

#[derive(Clone, Debug)]
struct AppState {
    api_key: String,
}

impl TaskHandler for ProcessUserTask {
    const IDENTIFIER: &'static str = "process_user";

    async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        let app_state = ctx.get_ext::<AppState>().unwrap();
        call_service(&app_state.api_key, &self.user_id).await?;
        Ok::<(), String>(())
    }
}

Keep durable job input in the payload. Keep process-local resources, clients, counters, and configuration in extensions reached through WorkerContext.

Versioning Payloads

Queued jobs may outlive the code version that created them. Treat each payload as a stored API contract:

  • keep identifiers stable unless you intentionally migrate producers and workers
  • add optional fields when older queued jobs may not contain new data
  • prefer explicit field names over positional or loosely shaped JSON
  • avoid removing or renaming required fields while old jobs can still run
  • use a new identifier when the meaning of a task changes incompatibly

These rules matter for normal jobs and batch jobs. Batch jobs store an array of payload items, so each item shape needs the same compatibility care as a normal task payload.

Worker Lifecycle

A worker is built with WorkerOptions, initialized with init(), and then started with either run() or run_once(). The builder collects configuration; init() turns that configuration into a ready Worker.

let worker = graphile_worker::WorkerOptions::default()
    .concurrency(5)
    .schema("graphile_worker")
    .database_url("postgres://postgres:password@localhost/mydb")
    .define_job::<SendEmail>()
    .init()
    .await?;

worker.run().await?;

Worker::options() is an equivalent entry point if you prefer to start from the worker type:

let worker = graphile_worker::Worker::options()
    .database_url("postgres://postgres:password@localhost/mydb")
    .define_job::<SendEmail>()
    .init()
    .await?;

Initialization

WorkerOptions::init() performs the setup work that must happen before jobs can be processed:

  • uses the supplied database connection, or creates one from database_url
  • runs the database migrations for the configured schema
  • registers the configured task identifiers in the database
  • creates a random worker id with the graphile_worker_ prefix
  • applies defaults such as a one second poll interval and CPU-count concurrency
  • prepares shutdown, recovery, hook, queue, and batcher state

init() returns an error if the worker cannot connect to the database, no database URL or pool was supplied, migrations fail, task registration fails, or local queue configuration is invalid.

Database Configuration

A worker needs a PostgreSQL database before init() can succeed. You can pass a URL and let Graphile Worker RS create the pool:

let worker = graphile_worker::WorkerOptions::default()
    .database_url("postgres://postgres:password@localhost/mydb")
    .max_pg_conn(20)
    .define_job::<SendEmail>()
    .init()
    .await?;

Or you can pass an existing database connection. With the SQLx driver, pg_pool is a convenience wrapper:

let pg_pool = sqlx::postgres::PgPoolOptions::new()
    .max_connections(5)
    .connect("postgres://postgres:password@localhost/mydb")
    .await?;

let worker = graphile_worker::WorkerOptions::default()
    .pg_pool(pg_pool)
    .define_job::<SendEmail>()
    .init()
    .await?;

If both an existing database connection and a URL are provided, the existing connection is used. max_pg_conn only applies when the worker creates a pool from database_url.

The schema defaults to graphile_worker. Set a custom schema when you want to isolate worker tables:

let worker = graphile_worker::WorkerOptions::default()
    .schema("my_app_worker")
    .database_url("postgres://postgres:password@localhost/mydb")
    .define_job::<SendEmail>()
    .init()
    .await?;

Task Registration

Workers only run tasks registered on the builder. The common path is define_job::<T>(), where T implements TaskHandler:

use graphile_worker::{
    IntoTaskHandlerResult, TaskHandler, WorkerContext, WorkerOptions,
};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct SendEmail {
    to: String,
}

impl TaskHandler for SendEmail {
    const IDENTIFIER: &'static str = "send_email";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        println!("Sending email to {}", self.to);
    }
}

let worker = WorkerOptions::default()
    .define_job::<SendEmail>()
    .database_url("postgres://postgres:password@localhost/mydb")
    .init()
    .await?;

For reusable modules, collect definitions and register them with define_jobs. Batch handlers can be registered with define_batch_job.

Jobs with forbidden flags are skipped by that worker:

let worker = graphile_worker::WorkerOptions::default()
    .add_forbidden_flag("high_memory")
    .define_job::<SendEmail>()
    .database_url("postgres://postgres:password@localhost/mydb")
    .init()
    .await?;

Concurrency and Polling

concurrency controls the maximum number of jobs the worker processes at the same time. If it is not set, the default is the number of logical CPUs. Passing 0 panics.

let worker = graphile_worker::WorkerOptions::default()
    .concurrency(10)
    .poll_interval(std::time::Duration::from_millis(500))
    .database_url("postgres://postgres:password@localhost/mydb")
    .define_job::<SendEmail>()
    .init()
    .await?;

poll_interval controls how often the worker checks PostgreSQL for work when notification delivery is not enough, or when jobs are scheduled for the future. The default is one second.

Running Continuously

Worker::run() is the normal long-running mode for application workers. It:

  • registers the worker for recovery bookkeeping
  • emits the worker start hook
  • starts the crontab scheduler and the job runner together
  • runs until its shutdown signal resolves or an error stops the runner
  • waits for completion and failure batchers to shut down
  • stops recovery tasks
  • emits the worker shutdown hook
  • deregisters the worker
graphile_worker::WorkerOptions::default()
    .concurrency(5)
    .database_url("postgres://postgres:password@localhost/mydb")
    .define_job::<SendEmail>()
    .init()
    .await?
    .run()
    .await?;

Use run() for service processes, web application sidecars, and deployments where the worker should keep listening for new jobs.

Running Once

Worker::run_once() processes currently available jobs and then returns. It uses the same configured concurrency and task handlers, but it does not start the continuous lifecycle used by run().

This is useful for scripts, tests, local maintenance commands, and one-shot workers:

let worker = graphile_worker::WorkerOptions::default()
    .concurrency(2)
    .schema("example_simple_worker")
    .define_job::<SayHello>()
    .pg_pool(pg_pool)
    .init()
    .await?;

let helpers = worker.create_utils();

for i in 0..10 {
    helpers
        .add_job(
            SayHello {
                message: format!("world {}", i),
            },
            graphile_worker::JobSpec::default(),
        )
        .await?;
}

worker.run_once().await?;

When a run-once job belongs to a queue, the worker checks for another job after releasing it so queued work can continue in order.

Shutdown

Each worker has an internal shutdown signal. By default, Graphile Worker RS also listens for OS shutdown signals such as SIGINT and SIGTERM. You can request shutdown from application code:

worker.request_shutdown();

If your host application already owns shutdown handling, pass a custom signal and disable the built-in OS listeners:

let worker = graphile_worker::WorkerOptions::default()
    .listen_os_shutdown_signals(false)
    .shutdown_signal(async {
        wait_for_application_shutdown().await;
    })
    .shutdown_grace_period(std::time::Duration::from_secs(10))
    .shutdown_interrupted_job_retry_delay(std::time::Duration::from_secs(30))
    .database_url("postgres://postgres:password@localhost/mydb")
    .define_job::<SendEmail>()
    .init()
    .await?;

worker.run().await?;

The same settings can be supplied as a WorkerShutdownConfig with worker_shutdown(config).

Dropping a Worker also notifies the internal shutdown signal as a safety net, but normal applications should shut down explicitly through the configured signal or request_shutdown().

Worker IDs and the Node.js Version

Graphile Worker RS is compatible with the original Node.js Graphile Worker for the queue schema, but worker identity is different. In the Node.js version, each process has its own worker_id. In the Rust version, init() creates one worker id for the Worker instance, and jobs are processed concurrently by the enabled async runtime using that same worker id.

Scheduling Model

Graphile Worker RS schedules work by inserting rows into PostgreSQL. A job becomes runnable when it is available, unlocked, below its retry limit, and its run_at timestamp is due.

Most Rust applications schedule jobs through WorkerUtils:

let utils = worker.create_utils();

utils.add_job(
    SendEmail {
        to: "user@example.com".to_string(),
        subject: "Welcome".to_string(),
        body: "Thanks for signing up.".to_string(),
    },
    JobSpec::default(),
).await?;

Use add_raw_job when the task identifier is only known at runtime:

utils.add_raw_job(
    "send_email",
    serde_json::json!({
        "to": "user@example.com",
        "subject": "Welcome"
    }),
    JobSpec::default(),
).await?;

JobSpec

JobSpec contains the scheduling metadata sent to the database:

use chrono::Utc;
use graphile_worker::{JobKeyMode, JobSpecBuilder};

let spec = JobSpecBuilder::new()
    .queue_name("user:123")
    .run_at(Utc::now() + chrono::Duration::minutes(5))
    .max_attempts(5)
    .job_key("welcome-email:123")
    .job_key_mode(JobKeyMode::Replace)
    .priority(-10)
    .flags(vec!["email".to_string()])
    .build();

The database fills omitted values:

FieldMeaningDefault
queue_nameOptional queue used to serialize related jobs.No queue
run_atEarliest timestamp at which the job may run.now()
max_attemptsMaximum attempts before the job is permanently unavailable.25
job_keyOptional deduplication key.No key
job_key_modeHow an existing keyed job is handled.Replace
prioritySort key for runnable jobs. Lower numbers run first.0
flagsOptional flags stored on the job.No flags

When WorkerUtils is configured with with_use_local_time(true), Rust passes the application's current UTC time for omitted run_at values. Otherwise the database uses PostgreSQL now().

Immediate And Future Jobs

JobSpec::default() schedules a job immediately. To delay a job, set run_at:

use chrono::Utc;
use graphile_worker::JobSpecBuilder;

let spec = JobSpecBuilder::new()
    .run_at(Utc::now() + chrono::Duration::hours(1))
    .build();

utils.add_job(SendEmail { /* ... */ }, spec).await?;

Workers only fetch jobs whose run_at is less than or equal to the current time used by the worker query.

Priority

Runnable jobs are selected in ascending priority order, then ascending run_at. The default priority is 0, so negative values run before default-priority jobs and positive values run later.

let urgent = JobSpecBuilder::new()
    .priority(-10)
    .build();

let normal = JobSpec::default();

Priority is not a separate queue. It only affects ordering among jobs that are otherwise available to the worker.

Job Keys

A job_key deduplicates jobs. Adding another available job with the same key updates the existing row instead of creating another row.

use graphile_worker::{JobKeyMode, JobSpecBuilder};

let spec = JobSpecBuilder::new()
    .job_key("sync-user:123")
    .job_key_mode(JobKeyMode::Replace)
    .build();

utils.add_job(SyncUser { user_id: 123 }, spec).await?;

The supported JobKeyMode values are:

ModeBehavior
ReplaceReplace the existing available job's queue, task, payload, run_at, max_attempts, priority, and flags. Reset attempts and clear the last error.
PreserveRunAtReplace the job, but keep the existing run_at when the existing job has not been attempted yet.
UnsafeDedupeIf the key already exists, only increment the revision and update updated_at; the existing payload and timing are left in place.

For a keyed job that already exists but is no longer available, the database clears the old key and marks that old row as fully attempted before inserting the replacement.

Batch scheduling supports Replace and PreserveRunAt. UnsafeDedupe is rejected for add_jobs and add_raw_jobs. In a batch, if any job uses PreserveRunAt, the preserve-run-at behavior is applied to keyed conflicts in that batch.

Queues

queue_name groups jobs that must not run in parallel. Jobs with the same queue share a row in the private job queue table, and the worker locks that queue when it fetches a queued job.

let spec = JobSpecBuilder::new()
    .queue_name("account:123")
    .build();

utils.add_job(UpdateAccount { id: 123 }, spec).await?;

Use queues for per-user, per-account, or per-resource workflows where ordering or mutual exclusion matters. Jobs without a queue are not serialized by this mechanism.

Batch Scheduling

Use add_jobs or add_raw_jobs to insert many jobs in one database round trip:

let spec = JobSpec::default();

utils.add_jobs::<SendEmail>(&[
    (SendEmail { /* ... */ }, &spec),
    (SendEmail { /* ... */ }, &spec),
]).await?;

For batch task handlers, add_batch_job stores a JSON array in a single job. When a keyed batch job is replaced and both the existing and new payloads are arrays, the database appends the new array to the existing array.

Cron

Cron schedules are configured on WorkerOptions, not by manually looping in application code. The typed API builds cron definitions in Rust:

use graphile_worker::{Cron, CrontabFill, WorkerOptions};

let worker = WorkerOptions::default()
    .define_job::<SendDailyReport>()
    .with_cron(
        Cron::daily_at::<SendDailyReport>(8, 0)?
            .fill(CrontabFill::hours(1)),
    )
    .init()
    .await?;

Crontab text is also supported:

let worker = WorkerOptions::default()
    .define_job::<SendDailyReport>()
    .with_cron("0 8 * * * send_daily_report")?
    .init()
    .await?;

On each cron tick, the runner checks which crontabs match the current local timestamp and schedules matching jobs through the same database add_job function used by one-off jobs. Cron jobs can carry payload, queue, run_at, max_attempts, priority, job key, and job key mode metadata from the crontab definition. If the payload is a JSON object, the runner adds a _cron object with the scheduled timestamp and whether the job was backfilled.

The runner records known crontabs and their last execution timestamp so a cron tick is only scheduled once for a given crontab execution.

Retry Metadata

The worker increments attempts when it locks a job to run it. A job remains available only while attempts < max_attempts.

When a job fails, the failure query stores last_error, unlocks the job, and sets a future run_at using exponential backoff:

greatest(now(), run_at) + exp(least(attempts, 10)) seconds

This means retry delay grows with the attempt count and is capped by using at most attempt 10 in the exponent. Once attempts reaches max_attempts, the job is no longer available for normal fetching.

Existing jobs can be changed with management utilities such as rescheduling. RescheduleJobOptions can update run_at, priority, attempts, and max_attempts for selected job ids.

Queues and Concurrency

Graphile Worker RS uses PostgreSQL as the source of truth for runnable jobs, and worker concurrency decides how many jobs a worker may execute at the same time. Queues add another layer of control: they let you group jobs by workload and serialize jobs that share the same queue name.

Use queues when one class of work should not interfere with another, or when a sequence of jobs must not run in parallel.

Assigning Jobs to Queues

Jobs can be assigned to a queue with JobSpec::queue_name. The field is optional; when it is not set, the job is added without an explicit queue name.

use graphile_worker::JobSpecBuilder;

worker
    .create_utils()
    .add_job(
        SendEmail {
            to: "user@example.com".to_string(),
        },
        JobSpecBuilder::new()
            .queue_name("mail")
            .build(),
    )
    .await?;

The same option is available when building JobSpec directly:

use graphile_worker::JobSpec;

let spec = JobSpec {
    queue_name: Some("exports".to_string()),
    ..Default::default()
};

Queue names are data, not task identifiers. Different task handlers can share a queue when they must be serialized together, and the same task handler can use different queues for different tenants, accounts, or workload classes.

Serial Queues

A queue is useful when jobs for the same resource must run one at a time. For example, jobs that update the same external account can all use a queue derived from that account id:

let queue_name = format!("account:{}", account_id);

worker
    .create_utils()
    .add_job(
        SyncAccount { account_id },
        JobSpecBuilder::new()
            .queue_name(queue_name)
            .build(),
    )
    .await?;

While a queued job is in progress, Graphile Worker RS records the queue lock in the database. Tests assert that a named queue has a locked_at timestamp, locked_by worker id, and a job count while its job is running. When the job finishes, the completed job is removed and the queue can be used by later work.

This makes queue names a practical serialization primitive:

  • Use one queue name for all jobs that must run in order.
  • Use distinct queue names for jobs that may run at the same time.
  • Keep queue names stable and deterministic when they represent a shared resource.

Worker Concurrency

Worker concurrency controls how many jobs a worker can execute at once.

let worker = graphile_worker::WorkerOptions::default()
    .concurrency(5)
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

With higher concurrency, independent queues can run in parallel. The concurrency tests enqueue five jobs into five different queues and configure the worker with concurrency(10); all five jobs are picked up and left in progress at the same time.

Without increasing concurrency, work is effectively drained one job at a time in the tested run_once path. That is useful for small deployments or jobs that should not overlap, but it means one slow job can delay unrelated work.

Workload Isolation

Queues and concurrency solve different parts of workload isolation.

Use queue names to protect resources:

let spec = JobSpecBuilder::new()
    .queue_name(format!("project:{}", project_id))
    .build();

Use worker concurrency to decide how many independent jobs the process can run:

let worker = graphile_worker::WorkerOptions::default()
    .concurrency(8)
    .define_job::<RenderPreview>()
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

Common patterns:

  • Per-tenant queues: tenant:42
  • Per-project queues: project:abc123
  • Per-external-resource queues: stripe:acct_123
  • Shared workload queues: mail, exports, webhooks

Choose the narrowest queue name that protects the resource you care about. A single global queue is simple, but it serializes everything behind the slowest job in that queue. Very fine-grained queue names allow more parallelism, but they only protect resources that are named consistently.

Local Queue

The local queue is an in-worker cache of jobs fetched from PostgreSQL. It exists to reduce polling and provide low-latency processing while the database remains the durable source of truth.

use graphile_worker::LocalQueueConfig;

let worker = graphile_worker::WorkerOptions::default()
    .concurrency(3)
    .local_queue(LocalQueueConfig::builder().size(10).build())
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

LocalQueueConfig includes:

  • size: maximum number of jobs each local queue may fetch and hold at once. The default is 100.
  • ttl: how long locally fetched jobs may stay unclaimed before being returned to the database. The default is five minutes.
  • refetch_delay: an optional delay strategy used when a fetch returns fewer jobs than requested.
  • queue_count: number of independent local queues to run inside this worker. The default is 1.

When queue_count is greater than one, size applies to each local queue. For example, size = 3 and queue_count = 4 allow up to twelve jobs to be locked locally across the worker. Tests also assert that queue_count is capped by worker concurrency, because each local queue needs at least one worker draining it.

let local_queue = LocalQueueConfig::default()
    .with_size(3)
    .with_queue_count(4);

let worker = graphile_worker::WorkerOptions::default()
    .concurrency(5)
    .local_queue(local_queue)
    .define_job::<SmallFastJob>()
    .pg_pool(pg_pool)
    .init()
    .await?;

Multiple local queues can improve throughput for very small, high-volume jobs by allowing several fetch batches in parallel. They can also lock more jobs inside one worker, so keep size lower when increasing queue_count and benchmark with realistic workloads.

Practical Guidance

Start with worker concurrency and explicit queue names before tuning local queue internals.

  • Increase concurrency when unrelated jobs should run in parallel and the database pool, runtime, and downstream services can handle the extra work.
  • Add queue_name when jobs for the same tenant, project, account, or external system must not overlap.
  • Use separate queue names for unrelated workloads so slow serial work does not block everything else.
  • Enable and tune local_queue when polling overhead or very small jobs become a bottleneck.
  • Raise queue_count only after measuring; it increases parallel fetch capacity and the number of jobs a worker can hold locally.

Database Schema

Graphile Worker RS stores jobs and coordination state in PostgreSQL. By default it uses the graphile_worker schema, and migrations create the tables, types, indexes, functions, and views that the worker runtime needs.

Treat this schema as owned by Graphile Worker RS. Application code should add jobs through the Rust API or the installed SQL functions instead of writing directly to private tables.

Schema Name

The default schema name is graphile_worker. Migration SQL is written with a schema placeholder and executed against the configured schema, so installations can use another schema name when the worker is configured that way.

For example, the migration executor replaces the schema placeholder before it runs each SQL statement:

:GRAPHILE_WORKER_SCHEMA

Most applications can leave the default schema in place.

Installed Objects

Current migrations install and maintain these main storage tables:

ObjectPurpose
_private_jobsStores queued jobs, scheduling fields, attempts, locks, flags, keys, and errors.
_private_tasksStores task identifiers and maps them to internal task ids.
_private_job_queuesStores named queues and queue lock state for serialized queue execution.
_private_workersStores worker heartbeat metadata used by stale-worker recovery.
migrationsRecords applied migration ids and whether each migration is breaking.

The worker also installs SQL functions used by the Rust implementation, including job insertion, completion, failure/rescheduling operations, worker heartbeat, stale-worker listing, and dead-worker recovery functions.

The exact private table layout can change across migrations. Prefer the documented APIs and the installed functions for integration points.

Public Jobs View

The migrations expose a jobs view over the private tables. It joins jobs to their task identifier and queue name so operators can inspect queue state without depending on every private table detail.

SELECT
    id,
    queue_name,
    task_identifier,
    priority,
    run_at,
    attempts,
    max_attempts,
    last_error,
    created_at,
    updated_at,
    key,
    locked_at,
    locked_by,
    revision,
    flags
FROM graphile_worker.jobs
ORDER BY priority, run_at, id;

Use this for diagnostics and operational visibility. Avoid updating through the view or relying on it as an application-owned table.

Jobs

A job row tracks the work to perform and the state needed to schedule it:

  • the task identifier, stored through _private_tasks;
  • the JSON payload;
  • optional queue membership, stored through _private_job_queues;
  • run_at, priority, attempts, and max_attempts;
  • optional key and flags;
  • lock fields, error text, revision, and timestamps.

Adding jobs through the worker APIs keeps these related records consistent. The SQL migrations also install add_job and add_jobs functions, which create task and queue rows as needed and notify workers that jobs were inserted.

use graphile_worker::{JobSpec, WorkerUtils};
use serde_json::json;

let worker_utils = WorkerUtils::new(database, "graphile_worker");

worker_utils
    .add_raw_job(
        "send_email",
        json!({ "user_id": 42 }),
        JobSpec::default(),
    )
    .await?;

Queues

Jobs may belong to a named queue. Queue rows carry lock state so the worker can serialize jobs within the same named queue while still allowing unrelated queues or unqueued jobs to make progress.

Queue names are an internal scheduling primitive, not a separate application model. Let add_job or add_jobs create queue rows as needed.

Workers

Workers register heartbeat state in _private_workers. The installed functions include:

  • worker_heartbeat, which inserts or updates a worker heartbeat and optional metadata;
  • worker_deregister, which removes a worker row;
  • list_stale_workers and list_orphan_locked_workers, which identify workers or locks that need recovery;
  • recover_dead_worker_jobs, which clears locks for selected workers and reschedules affected jobs;
  • delete_stale_workers, which removes stale worker rows.

These functions are part of the worker recovery machinery. Application code normally configures and runs workers rather than calling them directly.

Migrations

Graphile Worker RS ships its migration SQL with the crate and records applied migrations in the schema's migrations table. The current migration registry contains 20 migrations.

Migrations are designed to be repeatable after they have been applied: running migration again leaves existing jobs in place. The migration tests also cover taking over from a pre-existing migrations table.

Some migrations are marked as breaking in the migration metadata. The current breaking migration ids are:

1, 3, 11, 13, 14, 16

Migration 11 is intentionally cautious around locked jobs. If jobs are locked when that migration runs, migration fails with a dedicated locked-job error instead of silently changing active job state.

Compatibility

The worker checks migration revision compatibility after applying migrations. If the database contains a breaking migration id newer than the worker knows about, migration aborts instead of letting an older binary run against a future schema. If the newer recorded migration is not marked as breaking, the worker logs a compatibility warning and continues.

That check protects private schema compatibility. It also means deployment order matters when multiple services share the same worker schema: avoid rolling back to a binary that cannot understand the breaking schema version already installed in the database.

For operational SQL, keep queries conservative:

-- Good for inspection
SELECT id, task_identifier, run_at, attempts, max_attempts, last_error
FROM graphile_worker.jobs
WHERE attempts >= max_attempts
ORDER BY updated_at DESC;

Avoid SQL that inserts, updates, deletes, or assumes private column names in _private_* tables unless you are working on Graphile Worker RS itself.

Configuration

Graphile Worker RS is configured through WorkerOptions. Start with the few choices that define the worker's role, then add specialized options only when the deployment needs them.

Most applications need to answer these questions:

  1. Which runtime, TLS backend, and database driver should the crate compile with?
  2. Which PostgreSQL connection and schema should this worker use?
  3. Which task handlers is this process responsible for?
  4. How much work should this process run at once?
  5. Does the application need cron, lifecycle hooks, application state, local queueing, recovery, or custom shutdown behavior?

Minimal Worker

A worker needs a database connection, at least one task definition for the jobs it should process, and a call to init().

use graphile_worker::WorkerOptions;

let worker = WorkerOptions::default()
    .database_url("postgres://user:password@localhost/mydb")
    .define_job::<SendEmail>()
    .init()
    .await?;

worker.run().await?;

init() establishes or reuses the database connection, runs migrations for the configured schema, registers task details, creates a worker id, and returns a configured worker.

Decision Map

Use this map to find the option family that matches the decision you are making.

DecisionUseNotes
Compile for Tokio or async-stdCargo featuresDefault features enable Tokio, rustls, and SQLx. See Runtime, TLS, and Drivers.
Connect with an existing pooldatabase(...) or pg_pool(...)An explicit database connection takes precedence over database_url(...).
Connect from a URLdatabase_url(...) and max_pg_conn(...)max_pg_conn(...) applies only when the worker creates its own pool from a URL.
Isolate worker tablesschema(...)Defaults to the Graphile Worker schema when not set.
Register handlersdefine_job(...), define_batch_job(...), define_jobs(...)Use define_jobs(...) when a module exposes reusable job definitions.
Limit concurrent executionconcurrency(...)Defaults to the number of logical CPUs and must be greater than zero.
Tune database pollingpoll_interval(...)Defaults to one second. Notifications still provide low-latency wakeups when available.
Skip flagged jobsadd_forbidden_flag(...)Workers with forbidden flags bypass local queueing and fetch directly from the database.
Add recurring jobswith_cron(...) or with_crons(...)Accepts typed cron values, raw crontabs, or crontab text depending on input. See Cron Jobs.
Share application stateadd_extension(...)Extensions are available from task contexts. See Application State and Extensions.
Observe or intercept lifecycle eventson(...) or add_plugin(...)See Lifecycle Hooks.
Improve throughput with local bufferinglocal_queue(...)Batch-fetches jobs into a local cache. See Local Queue.
Batch completion or permanent failure writescomplete_job_batch_delay(...), fail_job_batch_delay(...)Small delays such as 1-5ms are recommended by the API docs.
Recover jobs locked by dead workersworker_recovery(...) or recovery convenience settersRecovery is opt-in. See Worker Recovery.
Integrate with host shutdownworker_shutdown(...) or shutdown convenience settersSee Shutdown.

For a complete option reference, see Worker Options.

Database Configuration

Pass a database connection directly when your application already owns the pool:

let worker = WorkerOptions::default()
    .pg_pool(pg_pool)
    .schema("graphile_worker")
    .define_job::<SendEmail>()
    .init()
    .await?;

Or let the worker create its own connection pool from a PostgreSQL URL:

let worker = WorkerOptions::default()
    .database_url("postgres://user:password@localhost/mydb")
    .max_pg_conn(20)
    .define_job::<SendEmail>()
    .init()
    .await?;

Use schema(...) when you want Graphile Worker tables in a specific PostgreSQL schema, for example to separate environments or independent worker systems in the same database.

Runtime And Driver Features

The default crate features enable:

graphile_worker = { version = "0.13", features = ["tls-rustls"] }

That default path uses Tokio, rustls, and the SQLx driver. To use async-std, disable default features and enable the async-std runtime explicitly:

graphile_worker = { version = "0.13", default-features = false, features = [
  "runtime-async-std",
  "tls-rustls",
  "driver-sqlx",
] }

Feature choices are compile-time configuration, not WorkerOptions methods. See Runtime, TLS, and Drivers and Feature Flags before changing them.

Worker Role

A process can register one task, a batch task, or a set of reusable job definitions:

let worker = WorkerOptions::default()
    .define_job::<SendEmail>()
    .define_batch_job::<ImportContacts>()
    .define_jobs(reporting_jobs())
    .add_forbidden_flag("high_memory")
    .init()
    .await?;

Use add_forbidden_flag(...) for specialized workers that must refuse jobs with certain flags. This is a filtering rule for the worker process; it does not register a handler by itself.

Throughput And Latency

The primary knobs are concurrency(...), poll_interval(...), local queueing, and optional batched writes.

use std::time::Duration;

let worker = WorkerOptions::default()
    .concurrency(10)
    .poll_interval(Duration::from_millis(500))
    .complete_job_batch_delay(Duration::from_millis(5))
    .fail_job_batch_delay(Duration::from_millis(5))
    .define_job::<SendEmail>()
    .init()
    .await?;

concurrency(...) controls how many jobs can run at the same time. poll_interval(...) controls fallback polling and future scheduled job checks. For higher-throughput workloads, Local Queue can batch-fetch jobs from PostgreSQL and distribute them locally.

Scheduling

Use with_cron(...) for recurring jobs. Text input returns a result because it can fail to parse:

let worker = WorkerOptions::default()
    .define_job::<SendDigest>()
    .with_cron("0 8 * * * send_digest")?
    .init()
    .await?;

Typed cron builders and raw crontab values can also be added through with_cron(...) or with_crons(...). See Cron Jobs for the scheduling guide and Scheduling Model for the underlying concepts.

Application Integration

Use extensions for shared application state, hooks for lifecycle behavior, and shutdown settings when the host application already owns process signals:

use std::time::Duration;

let worker = WorkerOptions::default()
    .add_extension(app_config)
    .listen_os_shutdown_signals(false)
    .shutdown_signal(host_shutdown())
    .shutdown_grace_period(Duration::from_secs(5))
    .define_job::<SendEmail>()
    .init()
    .await?;

Shutdown configuration controls when the worker starts graceful shutdown, how long in-flight jobs may continue, and when shutdown-aborted jobs should be made eligible for retry. See Shutdown.

Recovery

Worker recovery is disabled by default. Enable it when your deployment needs jobs locked by crashed or unreachable workers to be released automatically:

use std::time::Duration;

let worker = WorkerOptions::default()
    .heartbeat_interval(Duration::from_secs(30))
    .sweep_interval(Duration::from_secs(60))
    .sweep_threshold(Duration::from_secs(300))
    .recovery_delay(Duration::from_secs(30))
    .define_job::<SendEmail>()
    .init()
    .await?;

The recovery convenience setters enable recovery while setting their specific interval or delay. Use worker_recovery(...) when you want to build and pass a full recovery configuration object. See Worker Recovery.

Worker Options

WorkerOptions is the main builder used to configure a worker before calling init(). The builder collects database settings, registered task handlers, scheduling rules, hooks, and performance options, then init() connects to PostgreSQL, runs migrations, registers task details, and returns a runnable worker.

Most applications start with WorkerOptions::default(), chain the options they need, then call init().await? followed by run().await?.

use graphile_worker::WorkerOptions;

let worker = WorkerOptions::default()
    .concurrency(5)
    .schema("graphile_worker")
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

worker.run().await?;

Database configuration

Every worker needs a database connection before init() can succeed. You can either pass an existing connection pool or let the worker create one from a database URL.

let worker = WorkerOptions::default()
    .database_url("postgres://user:password@localhost/mydb")
    .max_pg_conn(20)
    .define_job::<SendEmail>()
    .init()
    .await?;

When you already have a SQLx pool, pass it directly:

let worker = WorkerOptions::default()
    .pg_pool(pg_pool)
    .define_job::<SendEmail>()
    .init()
    .await?;

Important database methods:

  • database(value) uses an existing driver-agnostic database connection.
  • pg_pool(pool) is the SQLx convenience wrapper for passing an existing sqlx::PgPool.
  • database_url(url) stores the PostgreSQL URL used when no existing connection was provided.
  • max_pg_conn(count) sets the maximum pool size when the worker creates a pool from database_url; the default is 20.

If both an existing connection and database_url are provided, the existing connection takes precedence.

Core runtime settings

Use the core options to control where worker tables live and how aggressively the worker processes jobs.

use std::time::Duration;

let worker = WorkerOptions::default()
    .schema("my_app_worker")
    .concurrency(10)
    .poll_interval(Duration::from_millis(500))
    .use_notification_delivery(true)
    .use_local_time(false)
    .pg_pool(pg_pool)
    .define_job::<SendEmail>()
    .init()
    .await?;

Important core methods:

  • schema(name) sets the PostgreSQL schema for Graphile Worker tables. If it is not set, the default schema is graphile_worker.
  • concurrency(count) sets the maximum number of jobs processed at the same time. If it is not set, the worker uses the number of logical CPUs. Passing 0 panics.
  • poll_interval(duration) controls fallback polling for new or future jobs. The default is one second.
  • use_notification_delivery(value) controls whether the worker listens for PostgreSQL NOTIFY wakeups. The default is true; set it to false to use polling only. Notifications are treated as coalesced wakeup hints; when all workers are already saturated, extra notifications are dropped and the runner keeps draining the listener instead of blocking on worker fanout.
  • use_local_time(value) controls whether timestamps use application time (true) or PostgreSQL server time (false). The default is PostgreSQL server time.

PostgreSQL server time is the safer default when multiple worker processes run against the same database.

Job registration

Register every task handler that this worker should be able to run. The common case is one or more define_job::<T>() calls:

let worker = WorkerOptions::default()
    .define_job::<SendEmail>()
    .define_job::<GenerateReport>()
    .pg_pool(pg_pool)
    .init()
    .await?;

For larger applications, modules can expose reusable job definitions and the worker can register them together:

use graphile_worker::{JobDefinition, TaskHandler};

pub fn jobs() -> [JobDefinition; 2] {
    [
        SendEmail::definition(),
        GenerateReport::definition(),
    ]
}

let worker = WorkerOptions::default()
    .define_jobs(jobs())
    .pg_pool(pg_pool)
    .init()
    .await?;

Important job methods:

  • define_job::<T>() registers a TaskHandler.
  • define_batch_job::<T>() registers a BatchTaskHandler.
  • define_jobs(iterable) registers reusable JobDefinition values.
  • add_forbidden_flag(flag) makes this worker skip jobs with that flag.

When add_forbidden_flag is used, local queue configuration is disabled during initialization.

Cron schedules

Cron entries are added with with_cron or with_crons. Typed cron builders are useful when you want Rust types to define the task and payload:

use graphile_worker::{Cron, CronJobKeyMode, CrontabFill, WorkerOptions};

let worker = WorkerOptions::default()
    .define_job::<SayHello>()
    .pg_pool(pg_pool)
    .with_cron(
        Cron::every_n_minutes::<SayHello>(2)?
            .fill(CrontabFill::minutes(10))
            .job_key("say_hello_dedupe")
            .job_key_mode(CronJobKeyMode::PreserveRunAt)
            .payload(SayHello {
                message: "Crontab".to_string(),
            })?,
    )
    .init()
    .await?;

Crontab text is also accepted, but because it must be parsed, the call returns a Result:

let options = WorkerOptions::default()
    .define_job::<SendDigest>()
    .with_cron("0 8 * * * send_digest")?;

Use with_crons([...]) when you already have multiple typed crontab values to append.

Local queue

local_queue(config) enables batch-fetching jobs into an in-process local queue. This can reduce PostgreSQL load for high-throughput workers by fetching several jobs at once and distributing them locally to the worker's concurrency slots.

use graphile_worker::{LocalQueueConfig, RefetchDelayConfig, WorkerOptions};
use std::time::Duration;

let worker = WorkerOptions::default()
    .concurrency(4)
    .schema("example_local_queue")
    .define_job::<ProcessItem>()
    .pg_pool(pg_pool)
    .local_queue(
        LocalQueueConfig::default()
            .with_size(50)
            .with_ttl(Duration::from_secs(60))
            .with_refetch_delay(
                RefetchDelayConfig::default()
                    .with_duration(Duration::from_millis(100))
                    .with_threshold(5)
                    .with_max_abort_threshold(20),
            ),
    )
    .init()
    .await?;

Local queue settings are validated against the configured poll_interval during init(). If forbidden flags are configured, the worker ignores the local queue configuration.

Hooks, plugins, and extensions

Worker options can also compose application state and hook behavior:

let options = WorkerOptions::default()
    .add_extension(app_config)
    .add_plugin(LocalQueueMonitorPlugin::default())
    .on(JobStart, |ctx| async move {
        tracing::info!("job {} started", ctx.job.id());
    });

Important composition methods:

  • add_extension(value) stores typed application state that handlers can read from the worker context.
  • on(event, handler) registers a hook handler directly.
  • add_plugin(plugin) registers a plugin that can attach several hooks.

Plugins are a good fit when a feature needs to register several related hooks, such as monitoring local queue events.

Completion and failure batching

For high-throughput workers, completion and permanent failure updates can be batched to reduce SQL round trips:

use std::time::Duration;

let worker = WorkerOptions::default()
    .complete_job_batch_delay(Duration::from_millis(5))
    .fail_job_batch_delay(Duration::from_millis(5))
    .define_job::<ProcessItem>()
    .pg_pool(pg_pool)
    .init()
    .await?;

complete_job_batch_delay(delay) batches successful completion updates. fail_job_batch_delay(delay) batches permanent failure updates. Retryable failures are still processed individually so retry backoff timing can be preserved.

Composition examples

Basic worker

This is the smallest common shape: configure the database, register jobs, and run.

let worker = WorkerOptions::default()
    .schema("example_simple_worker")
    .concurrency(2)
    .define_job::<SayHello>()
    .pg_pool(pg_pool)
    .init()
    .await?;

worker.run().await?;

Scheduled worker

Scheduled workers are ordinary workers with cron entries added before initialization.

let worker = WorkerOptions::default()
    .schema("example_simple_worker")
    .concurrency(10)
    .define_job::<SayHello>()
    .define_job::<SayHello2>()
    .pg_pool(pg_pool)
    .with_cron(
        Cron::every_n_minutes::<SayHello>(2)?
            .fill(CrontabFill::minutes(10))
            .payload(SayHello {
                message: "Crontab".to_string(),
            })?,
    )
    .init()
    .await?;

Throughput-oriented worker

For job bursts, combine a tuned concurrency value with local queueing and small batching delays.

use std::time::Duration;

let worker = WorkerOptions::default()
    .schema("jobs")
    .concurrency(16)
    .poll_interval(Duration::from_millis(500))
    .complete_job_batch_delay(Duration::from_millis(5))
    .fail_job_batch_delay(Duration::from_millis(5))
    .local_queue(
        LocalQueueConfig::default()
            .with_size(100)
            .with_ttl(Duration::from_secs(60)),
    )
    .define_jobs(jobs())
    .pg_pool(pg_pool)
    .init()
    .await?;

Tune these values with your own workload, PostgreSQL latency, pool size, and job duration. Higher concurrency is useful only when the database pool and task workload can support it.

Runtime, TLS, and Drivers

Graphile Worker RS exposes three separate feature choices for database access:

  • an async runtime feature
  • a TLS backend feature
  • a PostgreSQL driver feature

The default crate features enable the common path:

[dependencies]
graphile_worker = "0.13"

This is equivalent to enabling runtime-tokio, tls-rustls, and driver-sqlx.

Feature Groups

Runtime features choose the async runtime used by the worker internals:

FeatureNotes
runtime-tokioDefault runtime. Required by driver-tokio-postgres.
runtime-async-stdSupported with the SQLx driver.

TLS features choose the TLS implementation passed through to the database crates:

FeatureNotes
tls-rustlsDefault TLS backend.
tls-native-tlsNative TLS backend.

Driver features choose which PostgreSQL client integration is compiled:

FeatureNotes
driver-sqlxDefault driver. Enables the optional sqlx dependency.
driver-tokio-postgresEnables the tokio-postgres based driver path and also enables runtime-tokio.

Valid Combinations

Use one runtime, one TLS backend when your driver needs TLS, and one driver. The combinations exercised by the repository test matrix are:

RuntimeDriverTLS in tested commandStatus
runtime-tokiodriver-sqlxtls-rustlsDefault tested path.
runtime-async-stddriver-sqlxtls-rustlsTested SQLx async-std path.
runtime-tokiodriver-tokio-postgresNot added by the matrix commandTested tokio-postgres path.

driver-tokio-postgres is Tokio-only. The repository runtime test command rejects driver-tokio-postgres with any runtime other than runtime-tokio.

Cargo Examples

Use defaults unless you have a reason to choose another runtime or driver:

[dependencies]
graphile_worker = "0.13"

To make the default feature set explicit:

[dependencies]
graphile_worker = { version = "0.13", default-features = false, features = [
  "runtime-tokio",
  "tls-rustls",
  "driver-sqlx",
] }

To use SQLx with async-std:

[dependencies]
graphile_worker = { version = "0.13", default-features = false, features = [
  "runtime-async-std",
  "tls-rustls",
  "driver-sqlx",
] }

To use the tokio-postgres driver:

[dependencies]
graphile_worker = { version = "0.13", default-features = false, features = [
  "driver-tokio-postgres",
] }

driver-tokio-postgres enables runtime-tokio for Graphile Worker RS. Add a TLS feature only when the rest of your database stack needs one from this crate.

SQLx Driver Executor Arguments

With driver-sqlx, the SQL helpers accept SQLx executors used by the tests:

  • a pool
  • an acquired connection
  • a transaction

That means direct SQL helpers can participate in a caller-owned transaction. In the tested path, a job added inside a SQLx transaction disappears after the caller rolls the transaction back.

use graphile_worker::sql::add_job::single::add_job;
use graphile_worker::{JobSpec, Schema};
use serde_json::json;

let mut tx = pool.begin().await?;

add_job(
    &mut tx,
    &Schema::default(),
    "send_email",
    json!({ "user_id": 42 }),
    JobSpec::default(),
    false,
)
.await?;

tx.commit().await?;

The lower-level DbExecutorArg path is also tested for execute, fetch_all, and fetch_one with SQLx pool, connection, and transaction executors.

tokio-postgres Driver Executor Arguments

With driver-tokio-postgres, the SQL helpers accept tokio-postgres and deadpool-postgres executor arguments used by the tests:

  • tokio_postgres::Client
  • tokio_postgres::Transaction
  • deadpool_postgres::Pool
  • a deadpool_postgres client checked out from the pool
use graphile_worker::sql::add_job::single::add_job;
use graphile_worker::{JobSpec, Schema};
use serde_json::json;
use tokio_postgres::NoTls;

let (client, connection) =
    tokio_postgres::connect("postgres://postgres:postgres@localhost/postgres", NoTls).await?;

tokio::spawn(async move {
    let _ = connection.await;
});

add_job(
    &client,
    &Schema::default(),
    "send_email",
    json!({ "user_id": 42 }),
    JobSpec::default(),
    false,
)
.await?;

The tested tokio-postgres path also verifies that jobs added through a tokio-postgres transaction participate in the caller transaction and are not persisted after rollback.

Running the Matrix Locally

The project justfile defines targeted runtime and driver checks. If DATABASE_URL is not set, the runtime target delegates to the Docker-backed variant.

just test-runtime runtime-tokio driver-sqlx
just test-runtime runtime-async-std driver-sqlx
just test-runtime runtime-tokio driver-tokio-postgres

The combined Docker matrix runs the same supported combinations against a local PostgreSQL container:

just test-docker-all-matrices

Shutdown

Graphile Worker shuts down gracefully when its shutdown signal completes. By default this includes OS shutdown signals, but applications that already own process lifecycle management can provide their own future instead.

Shutdown behavior is configured with WorkerShutdownConfig or with the matching convenience methods on WorkerOptions.

Defaults

WorkerShutdownConfig::default() uses these values:

use graphile_worker::WorkerShutdownConfig;
use std::time::Duration;

let shutdown = WorkerShutdownConfig::default()
    .listen_os_shutdown_signals(true)
    .grace_period(Duration::from_secs(5))
    .interrupted_job_retry_delay(Duration::from_secs(30));

The default configuration:

  • listens for OS shutdown signals
  • gives in-flight jobs 5 seconds to finish after shutdown starts
  • retries shutdown-aborted jobs after 30 seconds
  • has no custom application shutdown signal

OS Signals

When listen_os_shutdown_signals is enabled, Graphile Worker installs the platform shutdown listener provided by graphile-worker-shutdown-signal.

On Unix targets, the listener completes after one of these signals is received:

  • SIGUSR2
  • SIGINT
  • SIGPIPE
  • SIGTERM
  • SIGHUP

On Windows targets, the listener handles these console control events:

  • CTRL_C_EVENT
  • CTRL_CLOSE_EVENT
  • CTRL_LOGOFF_EVENT
  • CTRL_SHUTDOWN_EVENT

This is enabled by default:

use graphile_worker::WorkerOptions;

let worker = WorkerOptions::default()
    .listen_os_shutdown_signals(true)
    // ... other configuration
    .init()
    .await?;

Disable OS signal listeners when the host application already installs its own handlers:

use graphile_worker::WorkerOptions;

let worker = WorkerOptions::default()
    .listen_os_shutdown_signals(false)
    // ... other configuration
    .init()
    .await?;

Custom Shutdown Signal

A custom shutdown signal is any Future<Output = ()> + Send + 'static. The future should complete when the host application requests shutdown.

use graphile_worker::{WorkerOptions, WorkerShutdownConfig};

let shutdown = WorkerShutdownConfig::default()
    .listen_os_shutdown_signals(false)
    .shutdown_signal(async {
        wait_for_application_shutdown().await;
    });

let worker = WorkerOptions::default()
    .worker_shutdown(shutdown)
    // ... other configuration
    .init()
    .await?;

worker.run().await?;

You can also set the signal directly on WorkerOptions:

use graphile_worker::WorkerOptions;

let worker = WorkerOptions::default()
    .listen_os_shutdown_signals(false)
    .shutdown_signal(async {
        wait_for_application_shutdown().await;
    })
    // ... other configuration
    .init()
    .await?;

If both OS signal listening and a custom signal are configured, shutdown starts when either signal completes.

Grace Period

grace_period controls how long in-flight jobs may continue after a shutdown signal is received.

use graphile_worker::WorkerShutdownConfig;
use std::time::Duration;

let shutdown = WorkerShutdownConfig::default()
    .grace_period(Duration::from_secs(20));

The equivalent WorkerOptions convenience method is:

use graphile_worker::WorkerOptions;
use std::time::Duration;

let worker = WorkerOptions::default()
    .shutdown_grace_period(Duration::from_secs(20))
    // ... other configuration
    .init()
    .await?;

During shutdown, Graphile Worker still owns graceful draining after the signal is received.

Interrupted Job Retry Delay

interrupted_job_retry_delay controls when a job aborted by shutdown becomes eligible to run again.

use graphile_worker::WorkerShutdownConfig;
use std::time::Duration;

let shutdown = WorkerShutdownConfig::default()
    .interrupted_job_retry_delay(Duration::from_secs(60));

The equivalent WorkerOptions convenience method is:

use graphile_worker::WorkerOptions;
use std::time::Duration;

let worker = WorkerOptions::default()
    .shutdown_interrupted_job_retry_delay(Duration::from_secs(60))
    // ... other configuration
    .init()
    .await?;

Shutdown-aborted jobs use this delay instead of normal failure backoff, so shutdown does not consume a retry attempt. This behavior applies to ordinary graceful shutdown even when worker recovery is disabled.

Application State and Extensions

Task handlers often need access to dependencies that are not part of the job payload: configuration, API clients, counters, caches, or other process-local state. Graphile Worker RS exposes this through worker extensions.

An extension is a value registered on WorkerOptions and later read from WorkerContext by type. Each handler receives a WorkerContext, so the same shared state is available wherever jobs run in that worker process.

Register Shared State

Register application state with WorkerOptions::add_extension before calling init().

use graphile_worker::WorkerOptions;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};

#[derive(Clone, Debug)]
struct AppState {
    run_count: Arc<AtomicUsize>,
}

impl AppState {
    fn new() -> Self {
        Self {
            run_count: Arc::new(AtomicUsize::new(0)),
        }
    }

    fn increment_run_count(&self) -> usize {
        self.run_count.fetch_add(1, SeqCst)
    }
}

let worker = WorkerOptions::default()
    .define_job::<ShowRunCount>()
    .pg_pool(pg_pool)
    .add_extension(AppState::new())
    .init()
    .await?;

Extension values are stored by Rust type. add_extension accepts values that are Clone + Send + Sync + Debug + 'static. If another value with the same type is inserted into the same extension set, the later value replaces the earlier one.

Use a wrapper type when you need to store two values with the same underlying type:

#[derive(Clone, Debug)]
struct PublicApiBaseUrl(String);

#[derive(Clone, Debug)]
struct InternalApiBaseUrl(String);

Read State From a Handler

Inside a task handler, call ctx.get_ext::<T>() to retrieve an extension by type. The method returns Option<&T>, so handlers can decide whether missing state is a job error or an application configuration error.

use graphile_worker::{IntoTaskHandlerResult, WorkerContext};
use graphile_worker_task_handler::TaskHandler;
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct ShowRunCount;

impl TaskHandler for ShowRunCount {
    const IDENTIFIER: &'static str = "show_run_count";

    async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        let app_state = ctx
            .get_ext::<AppState>()
            .ok_or_else(|| "AppState extension is not configured".to_string())?;

        let run_count = app_state.increment_run_count();
        println!("Run count: {run_count}");

        Ok::<(), String>(())
    }
}

Handlers receive read-only access to the extension container. If the state itself must be mutated or shared across concurrent jobs, put the synchronization inside your state type, for example with Arc, atomics, or other thread-safe interior mutability.

What WorkerContext Provides

WorkerContext is the per-job context passed to every task handler. In addition to extensions, it exposes job and worker data such as:

  • ctx.payload() for the JSON payload.
  • ctx.job() for the complete job record.
  • ctx.database() for the database handle.
  • ctx.pg_pool() when the SQLx driver feature is used.
  • ctx.schema() for the configured Graphile Worker schema.
  • ctx.worker_id() for the worker processing the job.
  • ctx.get_ext::<T>() for application-specific extensions.

Use extensions for dependencies owned by your application. Use the built-in context methods for worker metadata and database access.

Context Helper Pattern

Some handlers need to enqueue follow-up work. Import WorkerContextExt to use the helper methods implemented for WorkerContext.

use chrono::{offset::Utc, Duration};
use graphile_worker::{
    IntoTaskHandlerResult, JobSpecBuilder, WorkerContext, WorkerContextExt,
};
use graphile_worker_task_handler::TaskHandler;
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize, Clone)]
struct SendWs {
    request_id: String,
}

impl TaskHandler for SendWs {
    const IDENTIFIER: &'static str = "send_ws";

    async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        println!("[send_ws] sent request {}", self.request_id);

        ctx.add_job(
            CheckWs {
                request_id: self.request_id,
            },
            JobSpecBuilder::new()
                .run_at(Utc::now() + Duration::seconds(10))
                .build(),
        )
        .await
        .map_err(|e| e.to_string())?;

        Ok::<(), String>(())
    }
}

#[derive(Deserialize, Serialize, Clone)]
struct CheckWs {
    request_id: String,
}

The helper trait creates a WorkerUtils value from the current context, using the same database, schema, task details, and local-time setting as the running worker. It provides typed and raw job enqueueing helpers:

  • ctx.utils()
  • ctx.add_job(...)
  • ctx.add_raw_job(...)
  • ctx.add_jobs(...)
  • ctx.add_raw_jobs(...)
  • ctx.add_batch_job(...)

This pattern keeps handler code small: use extensions for application-owned dependencies, and use WorkerContextExt when the handler needs to schedule more work through the same worker configuration.

Guides

Use these guides when you already know the basic worker shape and want to wire Graphile Worker RS into a real application flow. If you are new to the crate, start with Quick Start or First Worker, then come back here for focused workflows.

Pick a Guide

GoalStart here
Add work to the queue from application codeScheduling Jobs
Group related work and control batch behaviorBatch Jobs
Run recurring work on a cron scheduleCron Jobs
Understand low-latency in-process job dispatchLocal Queue
Recover jobs after crashed or stale workersWorker Recovery
Observe or customize job lifecycle eventsLifecycle Hooks
Inspect, retry, or manage jobs from codeJob Management

Common Starting Points

Define a Task Handler

Most guides assume you have a task type that implements TaskHandler. A task has a stable identifier, a serializable payload, and an async run method:

use graphile_worker::{IntoTaskHandlerResult, TaskHandler, WorkerContext};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct SendEmail {
    to: String,
    subject: String,
    body: String,
}

impl TaskHandler for SendEmail {
    const IDENTIFIER: &'static str = "send_email";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        println!("Sending email to {}", self.to);
        Ok::<(), String>(())
    }
}

For the full setup flow, see First Worker.

Register Jobs on a Worker

Register task handlers with WorkerOptions, then provide a PostgreSQL pool and initialize the worker:

let worker = graphile_worker::WorkerOptions::default()
    .concurrency(5)
    .schema("graphile_worker")
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

For configuration details, see Worker Options, Runtime, TLS, and Drivers, and Application State and Extensions.

Choose the Operational Path

After the worker is running, the guide you need depends on how jobs should be created and supervised:

  • Use Scheduling Jobs for ordinary background work such as sending emails, generating PDFs, or deferring slow application tasks.
  • Use Cron Jobs when the work should be created from a recurring schedule instead of a user request.
  • Use Worker Recovery for deployments where a process crash, network partition, forced abort, or orchestrator shutdown could leave jobs locked.
  • Use Lifecycle Hooks when you need code to run around job events.
  • Use Job Management when your application needs utility operations for existing jobs.

Scheduling Jobs

Jobs can be scheduled either from SQL or from Rust. Both paths write to the same Graphile Worker tables and use the same job options: task identifier, JSON payload, queue name, scheduled time, retry limit, job key, priority, and flags.

Use SQL when you are already inside a database migration, trigger, or stored procedure. Use WorkerUtils when scheduling from Rust application code.

Scheduling From SQL

The worker schema exposes an add_job function. The default schema is graphile_worker, so a minimal insert looks like this:

select * from graphile_worker.add_job(
    identifier => 'send_email',
    payload => '{"to":"user@example.com"}'::json
);

The Rust scheduler calls the same function with these named arguments:

select * from graphile_worker.add_job(
    identifier => 'send_email',
    payload => '{"to":"user@example.com"}'::json,
    queue_name => 'mailers',
    run_at => now() + interval '5 minutes',
    max_attempts => 10,
    job_key => 'send_email:user@example.com',
    priority => 5,
    flags => array['transactional'],
    job_key_mode => 'replace'
);

identifier must match a registered task identifier in your worker process. payload is passed to the task handler as JSON.

Scheduling From Rust

Create WorkerUtils from a running worker when you want it to share the worker's database, schema, hooks, task details, and time configuration:

let utils = worker.create_utils();

For a typed task, pass the task payload and a JobSpec. The task identifier comes from TaskHandler::IDENTIFIER, and the payload is serialized with serde_json.

use graphile_worker::{JobSpec, TaskHandler};

let job = utils
    .add_job(
        SendEmail {
            to: "user@example.com".to_string(),
            subject: "Welcome".to_string(),
        },
        JobSpec::default(),
    )
    .await?;

For dynamic scheduling, use a raw identifier and any serializable payload:

use graphile_worker::JobSpec;
use serde_json::json;

let job = utils
    .add_raw_job(
        "send_email",
        json!({
            "to": "user@example.com",
            "subject": "Welcome"
        }),
        JobSpec::default(),
    )
    .await?;

Raw jobs are useful when the caller does not know the Rust task type at compile time. Typed jobs are preferred when the task type is available because the identifier and payload type stay tied to the TaskHandler implementation.

Job Options

JobSpec controls how the job is inserted:

use chrono::{Duration, Utc};
use graphile_worker::{JobKeyMode, JobSpecBuilder};

let spec = JobSpecBuilder::new()
    .queue_name("mailers")
    .run_at(Utc::now() + Duration::minutes(5))
    .max_attempts(10)
    .job_key("send_email:user@example.com")
    .job_key_mode(JobKeyMode::Replace)
    .priority(5)
    .flags(vec!["transactional".to_string()])
    .build();

All JobSpec fields are optional. JobSpec::default() or JobSpec::new() schedules with no explicit options.

FieldEffect
queue_nameAssigns the job to a named queue.
run_atDelays execution until the given UTC timestamp.
max_attemptsSets the maximum number of attempts before the job is permanently failed.
job_keyDeduplicates scheduled jobs with the same key.
job_key_modeControls how an existing keyed job is handled.
prioritySets job priority. Lower numbers run sooner.
flagsStores string flags on the job for worker features and custom conventions.

When run_at is not supplied, the database function schedules the job using its default time behavior. If the worker is configured to use local application time, WorkerUtils supplies Utc::now() for jobs without an explicit run_at.

Job Keys

Set job_key when multiple scheduling attempts should refer to the same logical job.

use graphile_worker::{JobKeyMode, JobSpecBuilder};

let spec = JobSpecBuilder::new()
    .job_key("recalculate-account:42")
    .job_key_mode(JobKeyMode::Replace)
    .build();

utils.add_job(RecalculateAccount { account_id: 42 }, spec).await?;

The supported modes are:

ModeBehavior visible from the scheduler
JobKeyMode::ReplaceReplaces the existing keyed job data. This is the default mode when a key is used.
JobKeyMode::PreserveRunAtUpdates the keyed job but keeps its existing run_at.
JobKeyMode::UnsafeDedupeDeduplicates without replacing the existing run_at; supported for single-job scheduling only.

When a keyed job is updated, the stored job is reused and its revision is incremented. If a keyed job is locked while a replacement is scheduled, the implementation retires the locked row for normal failure handling and keeps a single replacement row with the key.

Bulk Scheduling

Use add_jobs to schedule many jobs of the same task type in one database round trip:

use graphile_worker::JobSpecBuilder;

let urgent = JobSpecBuilder::new().priority(-10).build();
let normal = JobSpec::default();

let jobs = vec![
    (SendEmail { to: "a@example.com".to_string() }, &urgent),
    (SendEmail { to: "b@example.com".to_string() }, &normal),
];

let added = utils.add_jobs::<SendEmail>(&jobs).await?;

Each item can reference its own JobSpec, or many items can share the same spec.

Use add_raw_jobs when a single bulk insert needs heterogeneous task identifiers:

use graphile_worker::{JobSpec, RawJobSpec};
use serde_json::json;

let jobs = vec![
    RawJobSpec {
        identifier: "send_email".into(),
        payload: json!({ "to": "a@example.com" }),
        spec: JobSpec::default(),
    },
    RawJobSpec {
        identifier: "refresh_search_index".into(),
        payload: json!({ "model": "account", "id": 42 }),
        spec: JobSpec::default(),
    },
];

let added = utils.add_raw_jobs(&jobs).await?;

Empty bulk inputs return an empty Vec and do not insert jobs.

Bulk scheduling has two job-key limitations:

  • JobKeyMode::UnsafeDedupe is rejected for add_jobs and add_raw_jobs.
  • If any job in the batch uses JobKeyMode::PreserveRunAt, the batch call applies the preserve-run-at setting to the underlying database operation. Use individual add_job or add_raw_job calls when each job needs independent key-mode behavior.

Attempts, Priority, And Flags

max_attempts controls the retry limit for the scheduled job. Failed jobs are retried by the worker until their attempt count reaches this limit.

priority is stored as an integer. Jobs are fetched in ascending priority order, so lower values, including negative values, run before the default priority 0.

flags are stored as an array of strings. The scheduler does not interpret custom flag names while adding jobs; it passes them through to the database.

For changing jobs after they have been scheduled, see Job Management.

Batch Jobs

Graphile Worker RS has two batch-related APIs:

  • add_jobs and add_raw_jobs insert many ordinary jobs in one call.
  • BatchTaskHandler processes a JSON array stored in one job payload.

Use bulk adds when you want to enqueue many independent jobs efficiently. Use a batch handler when one task run should receive a group of items and decide which items completed or failed.

Bulk Add Ordinary Jobs

WorkerUtils::add_jobs inserts multiple jobs for the same typed task. Each item is still stored as its own job row and is processed by the normal TaskHandler implementation.

use graphile_worker::{JobSpec, TaskHandler, WorkerContext};
use serde::{Deserialize, Serialize};

#[derive(Clone, Deserialize, Serialize)]
struct SendEmail {
    to: String,
    subject: String,
}

impl TaskHandler for SendEmail {
    const IDENTIFIER: &'static str = "send_email";

    async fn run(self, _ctx: WorkerContext) -> Result<(), String> {
        send_email(self.to, self.subject).await
    }
}

let spec = JobSpec::default();
let jobs = [
    (
        SendEmail {
            to: "alice@example.com".into(),
            subject: "Welcome!".into(),
        },
        &spec,
    ),
    (
        SendEmail {
            to: "bob@example.com".into(),
            subject: "Welcome!".into(),
        },
        &spec,
    ),
];

let added_jobs = worker.create_utils().add_jobs::<SendEmail>(&jobs).await?;

The JobSpec is supplied per job. You can reuse one spec for all jobs or pass different specs to set different priorities, queues, run times, or other job options.

use graphile_worker::{JobSpec, JobSpecBuilder};

let urgent = JobSpecBuilder::new().priority(-10).build();
let normal = JobSpec::default();

worker
    .create_utils()
    .add_jobs::<SendEmail>(&[
        (urgent_email, &urgent),
        (normal_email, &normal),
    ])
    .await?;

For heterogeneous batches, use add_raw_jobs. Each RawJobSpec carries its own task identifier, JSON payload, and JobSpec.

use graphile_worker::{JobSpec, JobSpecBuilder, RawJobSpec};
use serde_json::json;

let added_jobs = worker
    .create_utils()
    .add_raw_jobs(&[
        RawJobSpec {
            identifier: "send_email".into(),
            payload: json!({
                "to": "dave@example.com",
                "subject": "Notification"
            }),
            spec: JobSpec::default(),
        },
        RawJobSpec {
            identifier: "process_payment".into(),
            payload: json!({ "user_id": 123, "amount": 50 }),
            spec: JobSpecBuilder::new().priority(-10).build(),
        },
    ])
    .await?;

Bulk-added ordinary jobs are fetched and run like any other job. They do not call run_batch.

Batch Handlers

A batch job is one job row whose payload is a JSON array. Register it with define_batch_job and implement BatchTaskHandler for the item type stored in that array.

use graphile_worker::{
    BatchTaskHandler, IntoBatchTaskHandlerResult, JobSpec, WorkerContext,
    WorkerOptions,
};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct PendingNotification {
    user_id: String,
    message_id: String,
}

impl BatchTaskHandler for PendingNotification {
    const IDENTIFIER: &'static str = "send_notifications";

    async fn run_batch(
        items: Vec<Self>,
        _ctx: WorkerContext,
    ) -> impl IntoBatchTaskHandlerResult {
        let mut results = Vec::with_capacity(items.len());

        for item in items {
            results.push(
                send_notification(item)
                    .await
                    .map_err(|error| error.to_string()),
            );
        }

        results
    }
}

let worker = WorkerOptions::default()
    .define_batch_job::<PendingNotification>()
    // ... database pool and other options
    .init()
    .await?;

worker
    .create_utils()
    .add_batch_job(
        vec![
            PendingNotification {
                user_id: "1".into(),
                message_id: "a".into(),
            },
            PendingNotification {
                user_id: "1".into(),
                message_id: "b".into(),
            },
        ],
        JobSpec::default(),
    )
    .await?;

The stored payload for that job is an array:

[
  { "user_id": "1", "message_id": "a" },
  { "user_id": "1", "message_id": "b" }
]

add_batch_job requires at least one item. If the final payload is not a JSON array, or is an empty array, the job is rejected before insertion.

Return Values and Retries

Batch handlers can report success for the whole batch or per item.

Returning Ok(()), (), or BatchTaskResult::Complete completes the whole job.

impl BatchTaskHandler for PendingNotification {
    const IDENTIFIER: &'static str = "send_notifications";

    async fn run_batch(
        _items: Vec<Self>,
        _ctx: WorkerContext,
    ) -> impl IntoBatchTaskHandlerResult {
        Ok::<(), String>(())
    }
}

Returning one result per payload item lets Graphile Worker RS retry only the failed items. The result vector must have the same length as the input items.

impl BatchTaskHandler for PendingNotification {
    const IDENTIFIER: &'static str = "send_notifications";

    async fn run_batch(
        items: Vec<Self>,
        _ctx: WorkerContext,
    ) -> impl IntoBatchTaskHandlerResult {
        items
            .into_iter()
            .map(|item| {
                if should_retry(&item) {
                    Err(format!("failed {}", item.message_id))
                } else {
                    Ok(())
                }
            })
            .collect::<Vec<_>>()
    }
}

If only some item results fail, the worker removes successful items from the payload and leaves a retry job containing only the failed items. If the whole handler fails, or if the result vector length does not match the number of input items, the original payload is retried.

Invalid stored payloads are treated as job failures:

  • A batch handler requires the job payload to be a JSON array.
  • Every array item must deserialize into the batch item type.
  • Deserialization errors leave the original payload on the retried job.

Adding Batch Jobs From a Handler

WorkerContext can enqueue a typed batch job from inside another task.

impl TaskHandler for ParentJob {
    const IDENTIFIER: &'static str = "batch_parent_job";

    async fn run(self, ctx: WorkerContext) -> Result<(), String> {
        ctx.add_batch_job(
            vec![
                PendingNotification {
                    user_id: "1".into(),
                    message_id: "a".into(),
                },
                PendingNotification {
                    user_id: "1".into(),
                    message_id: "b".into(),
                },
            ],
            JobSpec::default(),
        )
        .await
        .map(|_| ())
        .map_err(|error| error.to_string())
    }
}

The inserted job uses the batch task identifier and stores the provided items as one JSON array payload.

Internal Completion and Failure Batching

The worker also batches some internal persistence work when completing or failing jobs. Completion and failure requests are collected for a configured delay, then flushed together. On shutdown, the batcher drains queued requests and flushes them before exiting. If the batcher has already closed, the worker falls back to direct completion or failure persistence for that job.

This internal persistence batching is separate from BatchTaskHandler: it does not change how many payload items your handler receives.

Cron Jobs

Cron jobs schedule registered tasks from cron-like definitions. They are useful for recurring work such as cleanup, reporting, reconciliation, and periodic synchronization.

Cron entries can be added with typed Rust builders or with crontab text:

  • WorkerOptions::with_cron(...) accepts one cron entry. It accepts typed builders, raw Crontab values, and crontab text.
  • WorkerOptions::with_crons(...) accepts multiple typed or raw Crontab values.
  • WorkerOptions::with_crontab(...) accepts crontab text, but is deprecated in favor of with_cron(...).

Typed cron builders

Use Cron when the schedule belongs to a Rust TaskHandler. The task identifier comes from T::IDENTIFIER, so it stays aligned with the job you registered with define_job.

use graphile_worker::{
    Cron, CrontabFill, IntoTaskHandlerResult, TaskHandler, WorkerContext,
    WorkerOptions,
};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct SendDigest {
    account_id: i64,
}

impl TaskHandler for SendDigest {
    const IDENTIFIER: &'static str = "send_digest";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        // Send the digest.
    }
}

let worker = WorkerOptions::default()
    .define_job::<SendDigest>()
    .with_cron(
        Cron::daily_at::<SendDigest>(8, 0)?
            .id("send_digest_morning")
            .fill(CrontabFill::hours(1))
            .payload(SendDigest { account_id: 42 })?,
    )
    .pg_pool(pg_pool)
    .init()
    .await?;

Typed schedules are built with:

Cron::every_minute::<MyTask>()
Cron::every_n_minutes::<MyTask>(5)?
Cron::hourly_at::<MyTask>(15)?
Cron::daily_at::<MyTask>(8, 0)?
Cron::weekly_on::<MyTask>(chrono::Weekday::Mon, 8, 0)?
Cron::monthly_on::<MyTask>(1, 8, 0)?
Cron::yearly_on::<MyTask>(1, 1, 8, 0)?

The typed constructors validate field ranges. Minutes are 0..=59, hours are 0..=23, days of month are 1..=31, and months are 1..=12.

Builder options include:

Cron::every_n_minutes::<SendDigest>(10)?
    .id("send_digest_every_10_minutes")
    .fill(CrontabFill::minutes(30))
    .max_attempts(5)
    .queue("digest")
    .priority(-10)
    .job_key("send_digest_dedupe")
    .job_key_mode(graphile_worker::CronJobKeyMode::PreserveRunAt)
    .payload(SendDigest { account_id: 42 })?;

Use payload(...) for a typed payload that serializes as JSON. Use payload_value(...) when you already have a serde_json::Value.

Multiple cron entries

Use with_crons for a collection of typed builders or Crontab values:

let worker = WorkerOptions::default()
    .define_job::<SendDigest>()
    .define_job::<SyncMetrics>()
    .with_crons([
        Cron::daily_at::<SendDigest>(8, 0)?.build(),
        Cron::every_n_minutes::<SyncMetrics>(15)?.build(),
    ])
    .pg_pool(pg_pool)
    .init()
    .await?;

Crontab text

with_cron also accepts crontab text and returns a Result because the string is parsed at runtime.

let worker = WorkerOptions::default()
    .define_job::<SendDigest>()
    .with_cron(
        r#"
        Run at 08:00 UTC every day.
        0 8 * * * send_digest ?id=send_digest_morning&fill=1h {account_id:42}
        "#,
    )?
    .pg_pool(pg_pool)
    .init()
    .await?;

The text format is:

* * * * * task_identifier ?options {payload}

The five schedule fields are UTC minute, UTC hour, UTC day of month, UTC month, and UTC day of week. Days of week use 0..=6.

Supported schedule values are:

  • * for every valid value.
  • */n for every value divisible by n.
  • n for one explicit value.
  • a-b for an inclusive range.
  • Comma-separated combinations such as 0,15,30,45 or 4,10-15.

Examples:

# Every minute.
* * * * * tick

# Every 4 hours at minute 0.
0 */4 * * * rollup

# Mondays at 04:30 UTC.
30 4 * * 1 send_weekly_email

# More than one value in a field.
30 4,10-15 * * 1 send_weekly_email

Task identifiers must start with a letter or underscore. After that they may contain letters, numbers, colon, underscore, or hyphen.

Options

Crontab text options use query-string syntax and must start with ?.

0 8 * * * send_digest ?id=send_digest_morning&fill=1h&max=5&queue=digest&priority=10

Supported options are:

  • id: stable identifier for this cron entry. By default the task identifier is used. Set this when the same task has more than one schedule.
  • fill: backfill duration for missed executions.
  • max: overrides the job's maximum attempts.
  • queue: schedules the job in a named queue.
  • priority: overrides the job priority.
  • job_key: sets a job key for deduplication.
  • job_key_mode: controls an existing unlocked job with the same key. Supported serialized values are replace and preserve_run_at.

Changing a cron entry identifier can cause the worker to treat it as a new cron entry. Prefer setting a stable id when a task has multiple cron schedules.

Payloads

Cron jobs receive a JSON object payload. The worker adds a _cron object to the payload for each scheduled job:

{
  "_cron": {
    "ts": "2026-06-16T08:00:00",
    "backfilled": false
  }
}

ts is the timestamp when the cron execution was due, and backfilled is true when the job was scheduled as a backfill.

With typed builders, set the payload with payload(...) or payload_value(...). With crontab text, write a JSON5 object after the options:

0 8 * * * send_digest ?id=send_digest_morning {account_id:42, urgent:false}

The parsed payload is merged with the default cron payload properties. The text payload must start with { and stay on one line.

Backfill and fill

By default, cron entries do not backfill missed executions. Add fill to ask the worker to schedule missed runs from a recent time window when it starts or recovers a known cron entry.

Typed builder:

Cron::every_n_minutes::<SyncMetrics>(10)?
    .id("sync_metrics")
    .fill(CrontabFill::minutes(30))

Crontab text:

*/10 * * * * sync_metrics ?id=sync_metrics&fill=30m

Fill durations are made from time units:

  • s: seconds
  • m: minutes
  • h: hours
  • d: days
  • w: weeks

Units may be combined in order, for example 1w2d3h30m. In Rust, use CrontabFill::seconds, minutes, hours, days, weeks, or CrontabFill::new(w, d, h, m, s).

Backfill only applies to cron entries that were already known to the worker. When a cron entry is first registered, the worker records it as known but does not immediately create old jobs for it. Tests cover this behavior by asserting that a newly registered cron entry has no last_execution and creates no backfill jobs.

When a known cron entry has a previous execution timestamp, the worker can catch up missed matching schedule times inside the configured fill window. A four-hour cron with fill=1d, for example, can create the missed four-hour executions from the last day. Larger fill windows can schedule more jobs and take longer at startup.

Runtime behavior

The cron runner checks schedule times and inserts regular jobs for matching entries. If time advances by multiple matching intervals, the runner catches up by scheduling the missed ticks it observes. Jobs are then processed by the same registered task handlers as jobs added through the regular job APIs.

Local Queue

Local Queue is an optional worker-side cache for high-throughput workloads. When enabled, the worker fetches a batch of jobs from PostgreSQL, keeps them locked locally, and lets worker concurrency drain that local batch without another database round trip for every job.

Use it when many small jobs make database fetch overhead noticeable. For slow or rare jobs, the default direct database fetch path is usually simpler and keeps less work locked inside a single process.

Basic Configuration

Enable the feature with WorkerOptions::local_queue:

use graphile_worker::{LocalQueueConfig, RefetchDelayConfig, WorkerOptions};
use std::time::Duration;

let worker = WorkerOptions::default()
    .concurrency(4)
    .local_queue(
        LocalQueueConfig::default()
            .with_size(100)
            .with_ttl(Duration::from_secs(300))
            .with_refetch_delay(
                RefetchDelayConfig::default()
                    .with_duration(Duration::from_millis(100))
                    .with_threshold(10)
                    .with_max_abort_threshold(500),
            ),
    )
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

The same settings are available through the generated builders:

use graphile_worker::{LocalQueueConfig, RefetchDelayConfig};
use std::time::Duration;

let config = LocalQueueConfig::builder()
    .size(100)
    .ttl(Duration::from_secs(300))
    .refetch_delay(
        RefetchDelayConfig::builder()
            .duration(Duration::from_millis(100))
            .threshold(10)
            .max_abort_threshold(500)
            .build(),
    )
    .build();

Config Fields

size controls the maximum number of jobs fetched and held by each local queue. The default is 100. It must be greater than zero and must not exceed i32::MAX.

ttl controls how long fetched jobs may remain unclaimed in the local queue before they are returned to PostgreSQL. The default is five minutes.

refetch_delay is optional. It slows the next fetch after a low-yield fetch so the worker does not immediately poll the database again when the queue appears empty or nearly empty.

queue_count controls how many independent local queues run inside one worker. The default is 1. It must be greater than zero. size applies per queue, so the maximum local capacity is size * queue_count.

If queue_count is greater than worker concurrency, it is capped at the concurrency value because each local queue needs a worker draining it.

Modes

Each local queue moves through a small state machine:

  • Polling: the queue may fetch jobs from PostgreSQL.
  • Waiting: jobs are cached locally and are being served from memory.
  • TtlExpired: the local cache TTL expired and unclaimed jobs are being returned to PostgreSQL.
  • Released: the local queue has shut down and will not provide more jobs.

When the cached batch is drained, the queue returns to Polling. When TTL expires, unclaimed jobs are returned to PostgreSQL and later fetches can poll again.

Refetch Delay

Refetch delay starts when a fetch returns fewer jobs than requested and the number of jobs fetched is not greater than the configured threshold.

use graphile_worker::{LocalQueueConfig, RefetchDelayConfig};
use std::time::Duration;

let config = LocalQueueConfig::default().with_refetch_delay(
    RefetchDelayConfig::default()
        .with_duration(Duration::from_millis(200))
        .with_threshold(5),
);

The actual delay is jittered between half of duration and the full duration. duration must not be larger than the worker poll_interval.

PostgreSQL notifications can wake the local queue sooner. While a refetch delay is active, incoming job pulses increment an internal counter. When that counter reaches the abort threshold, the delay is aborted and the queue fetches again. If max_abort_threshold is not set, the abort threshold is randomized up to 5 * size. If it is set, the randomized abort threshold is capped by that value.

Distribution

Local Queue does not change job ownership rules. Jobs are still locked in PostgreSQL under the worker id, and the worker's normal concurrency controls how many handlers can run at the same time.

With a single local queue, one batch is shared by the worker tasks. With queue_count greater than one, the worker starts multiple independent local queues and checks them round-robin. This can improve throughput for very small, high-volume jobs, but it can also lock more work inside one process:

use graphile_worker::LocalQueueConfig;

let config = LocalQueueConfig::default()
    .with_size(50)
    .with_queue_count(4);

In this example, up to 200 jobs may be locked locally if worker concurrency is at least 4.

Shutdown And TTL

Local Queue returns unclaimed cached jobs to PostgreSQL in two cases:

  • the queue TTL expires while jobs are still cached;
  • the worker shuts down and releases the local queue.

Returning jobs is retried internally. Jobs that are already running are handled by the worker's normal shutdown behavior; the local queue release path is for jobs that were fetched into the local cache but not yet claimed by a handler.

Choose ttl based on how much work you are comfortable locking inside one process if handlers are slower than the local batch drain rate.

Forbidden Flags

Workers configured with forbidden flags do not use the local queue cache. During worker initialization, a non-empty forbidden flag list disables the local queue configuration for that worker.

This preserves flag filtering semantics: jobs with forbidden flags are skipped by the worker, and eligible jobs are fetched directly from PostgreSQL with the forbidden flag filter applied.

Worker Recovery

Worker recovery is an opt-in safety mechanism for jobs that remain locked after a worker stops unexpectedly. It is useful when a process crashes, is aborted, loses database connectivity, or is terminated by an orchestrator before it can release its in-flight jobs.

When recovery is enabled, workers write heartbeat rows to PostgreSQL. A sweeper then looks for workers whose heartbeat is stale and returns their locked jobs to the queue.

Enabling Recovery

Recovery is disabled by default. You can enable it with WorkerRecoveryConfig:

use graphile_worker::{WorkerOptions, WorkerRecoveryConfig};
use std::time::Duration;

let recovery = WorkerRecoveryConfig::default()
    .enabled(true)
    .heartbeat_interval(Duration::from_secs(30))
    .sweep_interval(Duration::from_secs(60))
    .sweep_threshold(Duration::from_secs(300))
    .recovery_delay(Duration::from_secs(30));

let worker = WorkerOptions::default()
    .worker_recovery(recovery)
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

The convenience setters on WorkerOptions also enable recovery:

use graphile_worker::WorkerOptions;
use std::time::Duration;

let worker = WorkerOptions::default()
    .heartbeat_interval(Duration::from_secs(30))
    .sweep_interval(Duration::from_secs(60))
    .sweep_threshold(Duration::from_secs(300))
    .recovery_delay(Duration::from_secs(30))
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

Configuration

WorkerRecoveryConfig::default() uses these values:

OptionDefaultMeaning
enabledfalseWhether heartbeat registration and sweeping are active.
heartbeat_interval30 secondsHow often an enabled worker updates its heartbeat row.
sweep_interval60 secondsHow often the background sweeper checks for inactive workers.
sweep_threshold5 minutesHow old a heartbeat must be before the worker is considered inactive.
recovery_delay30 secondsDelay before recovered jobs are eligible to run again.
resilient_sweep_threshold_multiplier3Multiplier applied to sweep_threshold for workers holding resilient jobs.
resilient_job_flags["infrastructure_resilient"]Job flags that activate the extended threshold.

Each builder method on WorkerRecoveryConfig stores the new value and enables recovery, except .enabled(false), which explicitly disables it again.

Heartbeats

An enabled worker registers itself in the private workers table and refreshes last_heartbeat_at at the configured heartbeat_interval.

You can inspect registered workers with WorkerUtils::list_active_workers. The stale state is calculated from the threshold you pass to that method:

use std::time::Duration;

let workers = utils
    .list_active_workers(Duration::from_secs(60))
    .await?;

for worker in workers {
    println!(
        "worker={} stale={} started_at={} last_heartbeat_at={}",
        worker.worker_id,
        worker.is_stale,
        worker.started_at,
        worker.last_heartbeat_at
    );
}

Sweeping Stale Workers

The background sweeper runs at sweep_interval. During a sweep it:

  • takes a transaction-scoped PostgreSQL advisory lock, so concurrent sweepers do not recover the same jobs twice;
  • finds workers whose heartbeat is older than sweep_threshold;
  • also finds orphan job locks whose locked_by worker is no longer registered;
  • unlocks the jobs held by those workers;
  • decrements the recovered jobs' attempt count back down;
  • releases queue locks for queued jobs;
  • moves run_at forward by recovery_delay;
  • records Job recovered after worker interruption as the recovered job error.

If another worker already holds the sweep lock, the sweep exits without recovering jobs.

Manual Sweeps

Use WorkerUtils::sweep_stale_workers when you want an operator command, admin endpoint, or scheduled task to run recovery directly.

use graphile_worker::{SweepStaleWorkersOptions, WorkerUtils};
use std::time::Duration;

let utils = WorkerUtils::new(database, "graphile_worker".to_string());

let result = utils
    .sweep_stale_workers(SweepStaleWorkersOptions {
        sweep_threshold: Some(Duration::from_secs(60)),
        recovery_delay: Some(Duration::from_secs(30)),
        dry_run: false,
    })
    .await?;

println!(
    "recovered {} jobs from {:?}",
    result.recovered_count,
    result.worker_ids
);

SweepStaleWorkersOptions can override sweep_threshold and recovery_delay for a single sweep. Leave either field as None to use the recovery config defaults. Set dry_run: true to return the worker IDs that would be considered dead without recovering jobs or deleting stale worker rows.

When you need custom resilient settings for a manual sweep, pass a WorkerRecoveryConfig explicitly:

use graphile_worker::{SweepStaleWorkersOptions, WorkerRecoveryConfig};
use std::time::Duration;

let config = WorkerRecoveryConfig::default()
    .sweep_threshold(Duration::from_secs(60))
    .resilient_sweep_threshold_multiplier(3);

let result = utils
    .sweep_stale_workers_with_config(&config, SweepStaleWorkersOptions {
        recovery_delay: Some(Duration::from_secs(30)),
        dry_run: false,
        ..Default::default()
    })
    .await?;

Resilient Jobs

Some jobs are expected to run for a long time. Recovery supports a resilient flag so those jobs are not reclaimed as quickly as ordinary work.

By default, a job is resilient when its flags include "infrastructure_resilient": true. Workers holding resilient jobs use:

effective threshold = sweep_threshold * resilient_sweep_threshold_multiplier

The default multiplier is 3.

use graphile_worker::{JobSpec, INFRASTRUCTURE_RESILIENT_FLAG};

let job = utils
    .add_job(
        LongRunningJob { id: 1 },
        JobSpec::builder()
            .flags(vec![INFRASTRUCTURE_RESILIENT_FLAG.to_string()])
            .build(),
    )
    .await?;

You can configure a different flag list:

use graphile_worker::WorkerRecoveryConfig;

let recovery = WorkerRecoveryConfig::default()
    .resilient_job_flags(vec!["custom_resilient".to_string()]);

A configured flag must be present and truthy on the job for the extended threshold to apply.

Recovery Hooks

Recovery hooks let your application decide what to do with each job recovered from a crashed worker. The hook receives the job, the recovering worker ID, the previous worker ID, and the interruption reason.

use graphile_worker::{
    FailureReason, HookRegistry, JobRecovery, JobRecoveryResult, SweepStaleWorkersOptions,
    WorkerUtils,
};
use std::sync::Arc;

let mut hooks = HookRegistry::new();

hooks.on(JobRecovery, |ctx| async move {
    if ctx.reason == FailureReason::WorkerCrashed {
        JobRecoveryResult::Default
    } else {
        JobRecoveryResult::Skip
    }
});

let utils = WorkerUtils::new(database, "graphile_worker".to_string())
    .with_hooks(Arc::new(hooks));

let result = utils
    .sweep_stale_workers(SweepStaleWorkersOptions::default())
    .await?;

The hook result controls recovery for that job:

ResultEffect
JobRecoveryResult::DefaultUnlock the job, decrement attempts, and delay it by recovery_delay.
JobRecoveryResult::Reschedule { run_at, attempts }Unlock the job, set run_at, and optionally replace attempts.
JobRecoveryResult::FailWithBackoffFail the job with the normal retry backoff and WorkerCrashed as the error.
JobRecoveryResult::SkipLeave the job locked and do not count it as recovered.

After a hook handles a job with any result except Skip, the normal job interrupted hook is emitted with FailureReason::WorkerCrashed. After a sweep finishes, worker recovery hooks are emitted with the recovered worker IDs and the recovered job count.

Lifecycle Hooks

Lifecycle hooks let you observe worker activity or intercept specific points in the job lifecycle. They are registered through plugins and run inside the worker process that owns the hook registry.

Use hooks for logging, metrics, validation, schedule-time payload adjustment, and local queue diagnostics. Keep hook handlers fast and focused: they run on the worker path they observe or intercept.

Registering Hooks

A plugin implements Plugin and adds handlers to the HookRegistry:

use graphile_worker::{HookRegistry, JobComplete, Plugin, WorkerStart};

struct LoggingPlugin;

impl Plugin for LoggingPlugin {
    fn register(self, hooks: &mut HookRegistry) {
        hooks.on(WorkerStart, async |ctx| {
            println!("worker {} started", ctx.worker_id);
        });

        hooks.on(JobComplete, async |ctx| {
            println!("job {} completed in {:?}", ctx.job.id(), ctx.duration);
        });
    }
}

Add plugins when building the worker:

let worker = graphile_worker::WorkerOptions::default()
    .define_job::<ProcessData>()
    .pg_pool(pg_pool)
    .add_plugin(LoggingPlugin)
    .init()
    .await?;

Multiple plugins can be added with repeated .add_plugin(...) calls.

Observer vs Interceptor Hooks

Hooks are split into two categories.

Observer hooks return () and are for side effects. They are useful for metrics, tracing, logging, and state snapshots. When an observer event is emitted, every registered handler for that event is called.

Interceptor hooks return a result that can affect worker behavior. The common result type is HookResult:

use graphile_worker::HookResult;

HookResult::Continue
HookResult::Skip
HookResult::Fail("reason".to_string())

Continue allows the operation to proceed. Skip and Fail(...) are terminal results and stop the interceptor chain.

BeforeJobSchedule uses JobScheduleResult instead. It can continue with a payload, skip scheduling, or fail scheduling:

use graphile_worker::JobScheduleResult;

JobScheduleResult::Continue(payload)
JobScheduleResult::Skip
JobScheduleResult::Fail("reason".to_string())

Schedule interceptors are chained through the payload. If one plugin returns JobScheduleResult::Continue(new_payload), the next schedule interceptor receives that transformed payload. A terminal result stops later schedule interceptors.

Worker Hooks

Worker hooks observe the worker process itself.

HookContext highlightsType
WorkerInitdatabase, schema, concurrencyobserver
WorkerStartdatabase, worker_id, extensionsobserver
WorkerShutdowndatabase, worker_id, reasonobserver
WorkerRecoveredworker_id, dead_worker_ids, jobs_recoveredobserver

Example:

use graphile_worker::{HookRegistry, Plugin, WorkerShutdown, WorkerStart};

struct WorkerLogPlugin;

impl Plugin for WorkerLogPlugin {
    fn register(self, hooks: &mut HookRegistry) {
        hooks.on(WorkerStart, async |ctx| {
            println!("worker {} started", ctx.worker_id);
        });

        hooks.on(WorkerShutdown, async |ctx| {
            println!("worker {} stopped: {:?}", ctx.worker_id, ctx.reason);
        });
    }
}

Job Run Hooks

Run hooks observe or intercept jobs as they are fetched and executed.

HookContext highlightsType
JobFetchjob, worker_idobserver
BeforeJobRunjob, worker_id, payloadinterceptor
JobStartjob, worker_idobserver
AfterJobRunjob, worker_id, result, durationinterceptor
JobCompletejob, worker_id, durationobserver
JobFailjob, worker_id, error, will_retryobserver
JobPermanentlyFailjob, worker_id, errorobserver
JobInterruptedjob, worker_id, reasonobserver

BeforeJobRun can prevent a task handler from running:

use graphile_worker::{BeforeJobRun, HookRegistry, HookResult, Plugin};

struct ValidationPlugin;

impl Plugin for ValidationPlugin {
    fn register(self, hooks: &mut HookRegistry) {
        hooks.on(BeforeJobRun, async |ctx| {
            let should_skip = ctx
                .payload
                .get("skip")
                .and_then(|value| value.as_bool())
                .unwrap_or(false);

            if should_skip {
                return HookResult::Skip;
            }

            let should_fail = ctx
                .payload
                .get("force_fail")
                .and_then(|value| value.as_bool())
                .unwrap_or(false);

            if should_fail {
                return HookResult::Fail("forced failure by validation hook".into());
            }

            HookResult::Continue
        });
    }
}

When a BeforeJobRun hook skips a job, the task handler is not called. When it fails a job, the task handler is also not called and the normal failure path is used.

AfterJobRun sees the task result and duration. It also returns HookResult, so it can leave the result alone with Continue, mark the job skipped with Skip, or fail it with Fail(...).

Schedule Hooks

BeforeJobSchedule intercepts jobs before they are persisted. It receives the task identifier, JSON payload, and JobSpec.

Use it to reject, validate, or transform scheduled jobs:

use graphile_worker::{BeforeJobSchedule, HookRegistry, JobScheduleResult, Plugin};

struct SchedulePolicyPlugin;

impl Plugin for SchedulePolicyPlugin {
    fn register(self, hooks: &mut HookRegistry) {
        hooks.on(BeforeJobSchedule, async |ctx| {
            if ctx.identifier == "blocked_task" {
                return JobScheduleResult::Skip;
            }

            let mut payload = ctx.payload.clone();
            if let Some(object) = payload.as_object_mut() {
                object.insert("checked_by_hook".into(), serde_json::json!(true));
            }

            JobScheduleResult::Continue(payload)
        });
    }
}

If a schedule hook returns Skip, no job is inserted. If it returns Fail(...), scheduling returns an error. If it returns Continue(payload), that payload is used for the job and passed to any later schedule interceptor.

Cron scheduling also has observer hooks:

HookContext highlightsType
CronTicktimestamp, crontabsobserver
CronJobScheduledcrontab, scheduled_atobserver
BeforeJobScheduleidentifier, payload, specinterceptor

Local Queue Hooks

Local queue hooks observe the in-process queue used by a worker.

HookContext highlightsType
LocalQueueInitworker_idobserver
LocalQueueSetModeworker_id, old_mode, new_modeobserver
LocalQueueGetJobsCompleteworker_id, jobs_countobserver
LocalQueueReturnJobsworker_id, jobs_countobserver
LocalQueueRefetchDelayStartworker_id, duration, threshold, abort_thresholdobserver
LocalQueueRefetchDelayAbortworker_id, count, abort_thresholdobserver
LocalQueueRefetchDelayExpiredworker_idobserver

The local queue mode values are Starting, Polling, Waiting, TtlExpired, and Released.

Example:

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

use graphile_worker::{
    HookRegistry, LocalQueueGetJobsComplete, LocalQueueSetMode, Plugin,
};

#[derive(Clone)]
struct QueueMetricsPlugin {
    max_fetched: Arc<AtomicUsize>,
}

impl Plugin for QueueMetricsPlugin {
    fn register(self, hooks: &mut HookRegistry) {
        let max_fetched = self.max_fetched.clone();
        hooks.on(LocalQueueGetJobsComplete, move |ctx| {
            let max_fetched = max_fetched.clone();
            async move {
                max_fetched.fetch_max(ctx.jobs_count, Ordering::Relaxed);
            }
        });

        hooks.on(LocalQueueSetMode, async |ctx| {
            println!(
                "local queue mode changed: {:?} -> {:?}",
                ctx.old_mode,
                ctx.new_mode
            );
        });
    }
}

Recovery Hook

JobRecovery is an interceptor for recovered jobs. Its context includes the job, the current worker_id, the previous_worker_id, and the recovery reason.

It returns JobRecoveryResult:

use graphile_worker::JobRecoveryResult;

JobRecoveryResult::Default
JobRecoveryResult::Reschedule {
    run_at,
    attempts: Some(3),
}
JobRecoveryResult::FailWithBackoff
JobRecoveryResult::Skip

Use Default to keep the worker's standard recovery behavior. Use the other variants only when the recovered job needs explicit policy.

Practical Example

This plugin combines observer hooks for metrics with an interceptor for payload-based validation:

use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};

use graphile_worker::{
    BeforeJobRun, HookRegistry, HookResult, JobComplete, JobFail, JobStart, Plugin,
};

#[derive(Default)]
struct Counters {
    started: AtomicU64,
    completed: AtomicU64,
    failed: AtomicU64,
}

struct MetricsAndValidationPlugin {
    counters: Arc<Counters>,
}

impl Plugin for MetricsAndValidationPlugin {
    fn register(self, hooks: &mut HookRegistry) {
        let counters = self.counters.clone();
        hooks.on(JobStart, move |_ctx| {
            let counters = counters.clone();
            async move {
                counters.started.fetch_add(1, Ordering::Relaxed);
            }
        });

        let counters = self.counters.clone();
        hooks.on(JobComplete, move |_ctx| {
            let counters = counters.clone();
            async move {
                counters.completed.fetch_add(1, Ordering::Relaxed);
            }
        });

        let counters = self.counters.clone();
        hooks.on(JobFail, move |_ctx| {
            let counters = counters.clone();
            async move {
                counters.failed.fetch_add(1, Ordering::Relaxed);
            }
        });

        hooks.on(BeforeJobRun, async |ctx| {
            if ctx
                .payload
                .get("skip")
                .and_then(|value| value.as_bool())
                .unwrap_or(false)
            {
                return HookResult::Skip;
            }

            HookResult::Continue
        });
    }
}

See the repository examples in examples/hooks.rs and examples/hooks/*.rs for a runnable version with logging, metrics, validation, and sample jobs.

Job Management

WorkerUtils is the operational API for managing Graphile Worker jobs from Rust code. Use it to schedule work, cancel keyed jobs, complete or fail selected jobs, reschedule jobs, clean up old metadata, run migrations, and recover locks left behind by stopped workers.

Most applications get a WorkerUtils instance from an initialized worker:

let worker = graphile_worker::WorkerOptions::default()
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

let utils = worker.create_utils();

Operator tools that do not run a worker can construct utilities directly from a database handle and schema:

use graphile_worker::WorkerUtils;

let utils = WorkerUtils::new(database, "graphile_worker");
utils.migrate().await?;

Scheduling Jobs

Use add_job when the task type is known at compile time. The task handler's IDENTIFIER is used as the job identifier, and the payload is serialized to JSON.

use graphile_worker::{IntoTaskHandlerResult, JobSpec, TaskHandler, WorkerContext};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct SendEmail {
    to: String,
}

impl TaskHandler for SendEmail {
    const IDENTIFIER: &'static str = "send_email";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        Ok::<(), String>(())
    }
}

let job = utils
    .add_job(
        SendEmail {
            to: "user@example.com".to_string(),
        },
        JobSpec::default(),
    )
    .await?;

Use add_raw_job when the identifier is dynamic or when building an operator tool that schedules jobs without linking the task type.

use graphile_worker::JobSpec;
use serde_json::json;

let job = utils
    .add_raw_job(
        "send_email",
        json!({ "to": "user@example.com" }),
        JobSpec::default(),
    )
    .await?;

WorkerUtils also supports batch insertion:

let spec = JobSpec::default();
let jobs = vec![
    (SendEmail { to: "a@example.com".to_string() }, &spec),
    (SendEmail { to: "b@example.com".to_string() }, &spec),
];

let added = utils.add_jobs(&jobs).await?;

For mixed identifiers, use add_raw_jobs with RawJobSpec values:

use graphile_worker::{JobSpec, RawJobSpec};
use serde_json::json;

let added = utils
    .add_raw_jobs(&[
        RawJobSpec {
            identifier: "send_email".to_string(),
            payload: json!({ "to": "a@example.com" }),
            spec: JobSpec::default(),
        },
        RawJobSpec {
            identifier: "generate_report".to_string(),
            payload: json!({ "report_id": 42 }),
            spec: JobSpec::default(),
        },
    ])
    .await?;

Empty add_jobs and add_raw_jobs calls return an empty vector without writing to the database. For large batches, the utility attempts to run ANALYZE on the jobs table after inserting at least 10,000 jobs.

Job Keys

Set JobSpec::job_key to deduplicate work. Adding another job with the same key updates the existing job instead of creating another row. In the tested behavior, the later payload replaces the earlier payload and the job revision increments.

use graphile_worker::{JobKeyMode, JobSpec};

utils
    .add_job(
        SendEmail {
            to: "user@example.com".to_string(),
        },
        JobSpec {
            job_key: Some("welcome-email:user-123".to_string()),
            job_key_mode: Some(JobKeyMode::Replace),
            ..Default::default()
        },
    )
    .await?;

JobKeyMode::PreserveRunAt keeps the existing scheduled time when the keyed job is updated. JobKeyMode::Replace replaces it with the new run_at. JobKeyMode::UnsafeDedupe is supported for individual add_job and add_raw_job calls, but batch insertion rejects it. If a batch contains any PreserveRunAt job key mode, the batch insert applies preserve-run-at behavior uniformly.

Cancel a keyed job with remove_job:

utils.remove_job("welcome-email:user-123").await?;

Batch Task Jobs

For handlers that implement BatchTaskHandler, add_batch_job stores the payload as a JSON array in a single job row.

let job = utils
    .add_batch_job(
        vec![
            SendEmail { to: "a@example.com".to_string() },
            SendEmail { to: "b@example.com".to_string() },
        ],
        JobSpec::default(),
    )
    .await?;

The payload list must contain at least one item. If a before_job_schedule hook is installed, it must return a JSON array for batch jobs and must not return an empty array.

Completing, Failing, and Rescheduling Jobs

Administrative job actions take job ids and return the jobs that were changed. Locked jobs are left untouched by complete_jobs, permanently_fail_jobs, and reschedule_jobs.

let completed = utils.complete_jobs(&[101, 102]).await?;

let failed = utils
    .permanently_fail_jobs(&[103], "invalid payload")
    .await?;

Permanent failure sets last_error to the supplied reason and sets attempts to max_attempts.

Use RescheduleJobOptions to update only the fields you need:

use chrono::Utc;
use graphile_worker::worker_utils::types::RescheduleJobOptions;

let rescheduled = utils
    .reschedule_jobs(
        &[104, 105],
        RescheduleJobOptions {
            run_at: Some(Utc::now() + chrono::Duration::minutes(5)),
            priority: Some(10),
            attempts: Some(0),
            max_attempts: Some(25),
        },
    )
    .await?;

If run_at is not supplied, the database function schedules matching jobs to run immediately. priority, attempts, and max_attempts are optional and are left unchanged when omitted.

Cleanup

cleanup runs one or more maintenance tasks:

use graphile_worker::worker_utils::types::CleanupTask;

utils
    .cleanup(&[
        CleanupTask::GcTaskIdentifiers,
        CleanupTask::GcJobQueues,
        CleanupTask::DeletePermanentlyFailedJobs,
    ])
    .await?;

The available cleanup tasks are:

  • GcTaskIdentifiers: removes task identifiers no longer referenced by jobs.
  • GcJobQueues: removes queue records no longer referenced by jobs.
  • DeletePermanentlyFailedJobs: deletes unlocked jobs whose attempts have reached max_attempts.

When utilities come from a worker, GcTaskIdentifiers preserves task identifiers known by that worker and refreshes its task details afterward. This keeps horizontally scaled workers able to pick up newly scheduled jobs for registered task handlers after cleanup.

Worker Recovery Operations

For deployments using worker recovery, WorkerUtils can inspect heartbeat workers and run recovery sweeps manually.

use std::time::Duration;

let workers = utils
    .list_active_workers(Duration::from_secs(300))
    .await?;

Run a stale-worker sweep to recover jobs locked by workers that stopped heartbeating, and jobs or queues locked by worker ids that are no longer registered:

use graphile_worker::SweepStaleWorkersOptions;
use std::time::Duration;

let result = utils
    .sweep_stale_workers(SweepStaleWorkersOptions {
        sweep_threshold: Some(Duration::from_secs(300)),
        recovery_delay: Some(Duration::from_secs(30)),
        dry_run: true,
        ..Default::default()
    })
    .await?;

Use dry_run: true for inspection before making changes. When you need the sweep to use a specific recovery configuration, call sweep_stale_workers_with_config.

force_unlock_workers is a direct unlock tool for known worker ids:

utils
    .force_unlock_workers(&["graphile_worker_deadbeef"])
    .await?;

It clears locks for jobs and queues held by those workers while leaving other workers' locks alone.

Migrations

Operator processes can run schema migrations through utilities:

utils.migrate().await?;

This is the same migration path exposed by the command-line tool.

Command-Line Operations

The graphile-worker binary wraps the same operational concepts for shell workflows. It connects with --database-url or DATABASE_URL and defaults to the graphile_worker schema.

graphile-worker --database-url postgres://postgres:postgres@localhost/postgres migrate
DATABASE_URL=postgres://postgres:postgres@localhost/postgres graphile-worker add send_email --payload '{"to":"user@example.com"}'
graphile-worker list --state ready
graphile-worker complete 123 124
graphile-worker fail 125 --reason "invalid payload"
graphile-worker reschedule 126 --run-at 2026-01-02T03:04:05Z
graphile-worker remove cli-job-key
graphile-worker cleanup
graphile-worker force-unlock graphile_worker_deadbeef
graphile-worker sweep-stale-workers --sweep-threshold 5m --recovery-delay 30s
graphile-worker sweep-stale-workers --dry-run

Use the Rust API from application code and purpose-built operator services. Use the CLI for ad hoc inspection, maintenance, and recovery tasks.

Operations

This section covers the surfaces you use after Graphile Worker RS is running in an application: schema migrations, the command-line tool, the admin UI, worker shutdown, recovery, and production readiness checks.

Use these pages when you are preparing a deployment or responding to a queue incident:

  • CLI for adding, listing, completing, failing, rescheduling, removing, cleaning up, unlocking, and inspecting jobs.
  • Admin UI for browser-based queue inspection and controlled maintenance actions.
  • Migrations for installing and updating the PostgreSQL schema.
  • Deployment for running workers outside local development.
  • Observability for production visibility.

Operational surfaces

Graphile Worker RS exposes three main operational entry points.

Worker runtime

The worker process owns job execution. Production configuration normally starts with a PostgreSQL pool, a schema name, registered task handlers, concurrency, and shutdown behavior:

use graphile_worker::{WorkerOptions, WorkerShutdownConfig};
use std::time::Duration;

let shutdown = WorkerShutdownConfig::default()
    .listen_os_shutdown_signals(true)
    .grace_period(Duration::from_secs(5))
    .interrupted_job_retry_delay(Duration::from_secs(30));

let worker = WorkerOptions::default()
    .concurrency(5)
    .schema("graphile_worker")
    .worker_shutdown(shutdown)
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

worker.run().await?;

If your host application already owns shutdown, disable the built-in OS signal listeners and pass the host shutdown future to the worker. The worker still handles graceful draining, local queue release, batcher flushes, and configured grace-period behavior for in-flight jobs.

Command-line tool

The installed binary is graphile-worker. It connects through --database-url or DATABASE_URL and uses the graphile_worker schema by default.

graphile-worker --database-url postgres://postgres:postgres@localhost/postgres migrate
DATABASE_URL=postgres://postgres:postgres@localhost/postgres graphile-worker add send_email --payload '{"to":"user@example.com"}'
graphile-worker list --state ready
graphile-worker show 123
graphile-worker complete 123 124
graphile-worker fail 125 --reason "invalid payload"
graphile-worker reschedule 126 --run-at 2026-01-02T03:04:05Z
graphile-worker remove cli-job-key
graphile-worker cleanup
graphile-worker force-unlock graphile_worker_deadbeef
graphile-worker stats
graphile-worker queues
graphile-worker workers

For stale worker recovery, dry-run first so you can see which workers would be recovered:

graphile-worker sweep-stale-workers --dry-run
graphile-worker sweep-stale-workers --sweep-threshold 5m --recovery-delay 30s

Admin UI

The native admin UI is an Axum server. It serves the browser UI and protected API routes for:

  • session metadata
  • queue overview, stats, queues, locked workers, and active workers
  • job listing and single-job inspection
  • adding raw jobs
  • completing, permanently failing, running now, and rescheduling jobs
  • removing a job by key
  • maintenance actions: migrate, cleanup, force unlock, and sweep stale workers

When embedding the admin UI directly, AdminServerConfig defaults to 127.0.0.1:4000, uses the graphile_worker schema, enables write actions, and uses Basic auth with a random password for the admin user. The graphile-worker admin CLI command overrides the listen default to 127.0.0.1:5678. Authentication modes available in the server configuration are Basic, Bearer token, custom header token, and no auth. No-auth mode is rejected on non-loopback listen addresses.

Run write-capable admin UI instances behind an access-controlled boundary. The server adds no-store cache headers for dynamic routes, caches static assets for one hour, sets X-Content-Type-Options: nosniff, denies framing, uses Referrer-Policy: no-referrer, and applies a Content Security Policy. API write methods also require the admin CSRF token.

Use read-only mode for shared inspection surfaces. In read-only mode, write routes return 403 instead of mutating jobs or running maintenance.

Migrations

Migrations install and update the Graphile Worker PostgreSQL schema. The migration runner:

  • creates the schema and migrations table if they are missing
  • checks that PostgreSQL is version 12 or newer
  • runs pending migrations in order inside transactions
  • records each migration id and whether it is breaking
  • rejects a database revision that includes a breaking migration newer than the current crate supports
  • warns, but continues, when the database has a newer non-breaking revision than the current crate supports

Run migrations before starting workers after deploys that may include schema changes:

graphile-worker migrate

Migration 11 has a specific safety check: if locked jobs are present and PostgreSQL returns error code 22012, migration fails with guidance to shut workers down cleanly and unlock locked jobs and queues before retrying.

Recovery and locked jobs

Worker recovery is opt-in. When enabled, workers register heartbeat rows in PostgreSQL and refresh last_heartbeat_at on a configured interval. A sweeper can recover jobs held by workers that stopped heartbeating, and jobs or queues locked by worker ids that are no longer registered.

use graphile_worker::{WorkerOptions, WorkerRecoveryConfig};
use std::time::Duration;

let recovery = WorkerRecoveryConfig::default()
    .enabled(true)
    .heartbeat_interval(Duration::from_secs(30))
    .sweep_interval(Duration::from_secs(60))
    .sweep_threshold(Duration::from_secs(300))
    .recovery_delay(Duration::from_secs(30));

let worker = WorkerOptions::default()
    .worker_recovery(recovery)
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

Recovered jobs are unlocked, their attempt count is decremented back, their queue lock is released, and run_at is delayed by the configured recovery delay. Sweeps use a transaction-scoped PostgreSQL advisory lock so only one sweeper performs recovery at a time.

Manual recovery is available even when automatic recovery is disabled:

use graphile_worker::SweepStaleWorkersOptions;
use std::time::Duration;

let result = worker
    .create_utils()
    .sweep_stale_workers(SweepStaleWorkersOptions {
        sweep_threshold: Some(Duration::from_secs(300)),
        recovery_delay: Some(Duration::from_secs(30)),
        dry_run: true,
    })
    .await?;

println!("Would recover workers: {:?}", result.worker_ids);

Use force-unlock only when you have identified worker ids that should no longer own locks:

graphile-worker workers
graphile-worker force-unlock graphile_worker_deadbeef

Local queue considerations

The local queue reduces database round trips by batch-fetching jobs and caching them in the worker process. It has operational implications:

  • unclaimed jobs are returned to PostgreSQL on shutdown or when the local queue TTL expires
  • refetch delay can reduce thundering-herd behavior when queues are empty
  • cache size and refill threshold should be chosen with worker concurrency and database load in mind
use graphile_worker::{LocalQueueConfig, RefetchDelayConfig, WorkerOptions};
use std::time::Duration;

let worker = WorkerOptions::default()
    .local_queue(
        LocalQueueConfig::default()
            .with_size(100)
            .with_ttl(Duration::from_secs(300))
            .with_refetch_delay(
                RefetchDelayConfig::default()
                    .with_duration(Duration::from_millis(100))
                    .with_threshold(10),
            ),
    )
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

Production readiness checklist

Before sending production traffic to workers, verify the following:

  • PostgreSQL is version 12 or newer.
  • The database URL is configured through DATABASE_URL or an equivalent application secret.
  • Migrations are run before workers start processing jobs.
  • All deployed workers agree on the Graphile Worker schema name.
  • Worker task handlers are registered for every task identifier you enqueue.
  • Concurrency, PostgreSQL pool size, and local queue size are sized together.
  • Shutdown behavior is wired into your process manager or host application.
  • Recovery is enabled when the deployment model can leave jobs locked after crashes, network partitions, forced aborts, or orchestrator shutdowns.
  • Admin UI access uses Basic, Bearer, or header auth when exposed beyond loopback.
  • Admin UI read-only mode is used for inspection-only deployments.
  • Maintenance commands such as cleanup, force-unlock, and sweep-stale-workers are restricted to operators who can safely mutate queue state.
  • Stale-worker sweeps are dry-run before recovery during incident response.
  • Cleanup policy is understood before permanently failed jobs, task identifiers, or job queues are garbage-collected.
  • Observability covers worker startup, job failures, locked workers, queue backlog, and migration failures.

CLI

The graphile-worker binary provides operational access to a Graphile Worker PostgreSQL database. It can run migrations, add and inspect jobs, mutate job state, run maintenance tasks, print queue and worker summaries, and serve the embedded admin UI.

Most commands mutate the configured schema directly. Use the same database URL and schema that your workers use.

Connection Options

Every command accepts these global options:

graphile-worker \
  --database-url postgres://postgres:postgres@localhost/postgres \
  --schema graphile_worker \
  --max-connections 5 \
  --json \
  stats
OptionDefaultDescription
--database-urlDATABASE_URLPostgreSQL connection URL. The CLI exits if neither is set.
--schemaGRAPHILE_WORKER_SCHEMA, then graphile_workerGraphile Worker schema name.
--max-connections5Maximum PostgreSQL connections used by the CLI pool.
--jsondisabledPrint machine-readable JSON where the command supports it.

Migrations

Run migrations before using a fresh database:

graphile-worker --database-url "$DATABASE_URL" migrate

With JSON output:

graphile-worker --database-url "$DATABASE_URL" --json migrate

The JSON form prints:

{
  "migrated": true
}

Add Jobs

Add a job by task identifier. If no payload is provided, the payload defaults to an empty JSON object.

graphile-worker add send_email

Pass JSON directly:

graphile-worker add send_email \
  --payload '{"to":"user@example.com"}'

Read JSON from a file:

graphile-worker add send_email \
  --payload-file payload.json

Set common scheduling and routing fields:

graphile-worker add send_email \
  --payload '{"to":"user@example.com"}' \
  --queue emails \
  --run-at 2026-01-02T03:04:05Z \
  --max-attempts 5 \
  --priority 10 \
  --flag transactional

--run-at accepts an RFC 3339 timestamp or now. Lower priority values run sooner.

Use job keys for deduplication or replacement:

graphile-worker add send_email \
  --key user-123-welcome \
  --job-key-mode replace

--job-key-mode requires --key. Supported modes are:

ModeCLI value
Replacereplace
Preserve run timepreserve-run-at
Unsafe dedupeunsafe-dedupe

--payload and --payload-file cannot be used together. Payload input must be valid JSON.

Inspect Jobs

List jobs:

graphile-worker list

Filter by state, task, queue, and pagination:

graphile-worker list \
  --state ready \
  --identifier send_email \
  --queue emails \
  --limit 25 \
  --offset 0

Supported states are all, ready, scheduled, locked, and failed. --limit and --offset must be greater than or equal to zero.

Show one job by id:

graphile-worker show 123

Human-readable list output is tab-separated. Human-readable show output includes the job payload pretty-printed as JSON. Add --json to either command for structured output.

Mutate Jobs

Complete one or more jobs:

graphile-worker complete 123 124

Permanently fail jobs:

graphile-worker fail 125 --reason "invalid payload"

If --reason is omitted, the CLI uses Manually marked as failed.

Reschedule jobs or update retry metadata:

graphile-worker reschedule 126 --run-at 2026-01-02T03:04:05Z
graphile-worker reschedule 127 --now
graphile-worker reschedule 128 --priority 7
graphile-worker reschedule 129 --attempts 1 --max-attempts 5

reschedule requires at least one of --now, --run-at, --priority, --attempts, or --max-attempts.

Remove a job by job key:

graphile-worker remove user-123-welcome

Overview Commands

Print queue-wide job counts:

graphile-worker stats

List queues and queue lock state:

graphile-worker queues

List worker ids that currently hold job or queue locks:

graphile-worker workers

These commands also support --json.

Maintenance

Run cleanup tasks:

graphile-worker cleanup

When no task is passed, all cleanup tasks run. To run selected tasks:

graphile-worker cleanup delete-permanently-failed-jobs
graphile-worker cleanup gc-task-identifiers gc-job-queues

Force unlock jobs and queues locked by worker ids:

graphile-worker force-unlock graphile_worker_deadbeef
graphile-worker force-unlock worker-a worker-b

Recover stale workers and orphan locks:

graphile-worker sweep-stale-workers
graphile-worker sweep-stale-workers --sweep-threshold 5m --recovery-delay 30s

--sweep-threshold is the time since last heartbeat before a worker is deemed inactive. --recovery-delay is the delay before recovered jobs are eligible to run again. Durations may be plain seconds or use second, minute, or hour units, for example 300, 30s, 5m, or 2h.

Use dry-run mode to see which stale workers would be recovered without unlocking or rescheduling jobs:

graphile-worker sweep-stale-workers --dry-run

Admin UI

Serve the embedded admin UI and JSON management API:

graphile-worker admin

By default it listens on 127.0.0.1:5678 and uses HTTP Basic auth with username admin. If no password is provided, the CLI generates one and prints it at startup.

Useful options:

graphile-worker admin \
  --listen 127.0.0.1:5678 \
  --auth basic \
  --username admin \
  --password "$GRAPHILE_WORKER_ADMIN_PASSWORD"

Authentication modes are basic, bearer, header, and none.

ModeOptions and environment
basic--username, --password, or GRAPHILE_WORKER_ADMIN_PASSWORD
bearer--bearer-token or GRAPHILE_WORKER_ADMIN_BEARER_TOKEN
header--header-name, --header-token, or GRAPHILE_WORKER_ADMIN_HEADER_TOKEN
noneAllowed only when --listen uses a loopback address

For read-only operation, disable mutating admin actions:

graphile-worker admin --read-only

Operational Caveats

  • The CLI opens a PostgreSQL connection pool for every command, including the long-running admin server. Tune --max-connections for operational environments.
  • --json is intended for automation. Human-readable tables are tab-separated and may be easier to inspect in a terminal.
  • complete, fail, reschedule, remove, cleanup, force-unlock, sweep-stale-workers, and migrate change database state.
  • force-unlock unlocks by worker id. Prefer sweep-stale-workers --dry-run first when recovering from stale workers, then run without --dry-run after checking the listed worker ids.
  • --auth none for the admin UI is rejected unless the listen address is loopback.

Admin UI

Graphile Worker RS includes an embedded admin UI for inspecting queues, jobs, and workers, plus a JSON API for the same operations. The implementation is split across three crates:

  • graphile_worker_admin_api defines shared request and response types, and SQLx-backed read queries when built with the sqlx feature.
  • graphile_worker_admin_ui serves the native Axum application, the HTML page, embedded CSS, JavaScript, WASM, and JSON API routes.
  • graphile_worker_admin_ui_client contains the WASM client that runs in the browser.

The CLI uses these crates to expose the graphile-worker admin command.

Starting the Server

The admin command starts a native HTTP server and connects it to the same Postgres schema and WorkerUtils used by the rest of the CLI.

graphile-worker admin

By default it listens on 127.0.0.1:5678, uses HTTP Basic authentication with username admin, and generates a password when none is provided.

Useful options include:

graphile-worker admin --listen 127.0.0.1:5678
graphile-worker admin --auth basic --username admin
graphile-worker admin --auth bearer
graphile-worker admin --auth header --header-name x-graphile-worker-admin-token
graphile-worker admin --read-only

The CLI also reads these environment variables for configured secrets:

GRAPHILE_WORKER_ADMIN_PASSWORD="<password>" graphile-worker admin --auth basic
GRAPHILE_WORKER_ADMIN_BEARER_TOKEN="<token>" graphile-worker admin --auth bearer
GRAPHILE_WORKER_ADMIN_HEADER_TOKEN="<token>" graphile-worker admin --auth header

When a Basic password, bearer token, or header token is generated, the CLI prints it at startup. Configured secrets are not printed.

Native Embedding

Applications can build the Axum server directly with graphile_worker_admin_ui::AdminServerConfig and graphile_worker_admin_ui::serve.

use graphile_worker::{Schema, WorkerUtils};
use graphile_worker_admin_ui::{AdminAuthConfig, AdminServerConfig};
use sqlx::PgPool;

async fn serve_admin(pool: PgPool, utils: WorkerUtils) -> anyhow::Result<()> {
    let config = AdminServerConfig::builder(pool, utils)
        .schema(Schema::default())
        .schema_name("graphile_worker")
        .listen_addr("127.0.0.1:5678".parse()?)
        .auth(AdminAuthConfig::basic_with_random_password("admin"))
        .read_only(false)
        .build()?;

    graphile_worker_admin_ui::serve(config).await?;
    Ok(())
}

When embedding the admin UI directly, the config builder defaults to:

  • schema Schema::default()
  • schema name graphile_worker
  • listen address 127.0.0.1:4000
  • generated Basic auth for username admin
  • read-write mode

Authentication

The server supports four auth modes:

ModeBehavior
basicRequires HTTP Basic credentials. This is the default CLI mode.
bearerRequires Authorization: Bearer <token>.
headerRequires a configured header name whose value matches the token.
noneRequires no auth, but is only allowed on loopback listen addresses.

Both the CLI auth builder and the native config validation reject unauthenticated servers bound to non-loopback addresses.

For Basic auth, the server also applies page authentication to the browser page itself. API routes are authenticated in every mode except none.

CSRF and Security Headers

The server generates a CSRF token when it builds application state. Every mutating API request must include that token in the x-graphile-worker-admin-csrf header. The browser client reads the header name and token from the rendered page/session data and sends it on writes.

The server adds security headers to all responses:

  • Cache-Control: no-store for pages and API responses
  • Cache-Control: public, max-age=3600 for /assets/* and /favicon.ico
  • X-Content-Type-Options: nosniff
  • X-Frame-Options: DENY
  • Referrer-Policy: no-referrer
  • a content security policy limited to same-origin assets and API calls

Routes

The native server exposes the browser page, embedded assets, and JSON API from one Axum router.

RouteMethodPurpose
/GETRender the admin page.
/assets/admin.cssGETEmbedded stylesheet.
/assets/admin.jsGETEmbedded JavaScript entrypoint.
/assets/admin_ui.jsGETWASM bindgen JavaScript.
/assets/admin_ui_bg.wasmGETWASM client module.
/favicon.icoGETEmbedded SVG favicon.
/api/sessionGETReturn schema, read-only state, CSRF header name, and public auth summary.
/api/overviewGETReturn job counts, queues, locked workers, and active workers.
/api/jobsGETList jobs.
/api/jobsPOSTAdd a job.
/api/jobs/{id}GETFetch one listed job.
/api/jobs/actionPOSTComplete, fail, run now, or reschedule selected jobs.
/api/jobs/remove-by-keyPOSTRemove a job by key.
/api/maintenancePOSTRun maintenance operations.

Job Visibility

GET /api/jobs accepts these query parameters:

  • state: all, ready, scheduled, locked, or failed
  • identifier: task identifier filter
  • queue: queue name filter
  • search: text search filter
  • limit: result limit, capped by the native route at 500
  • offset: result offset

The response contains ListedJob rows with job metadata such as id, task identifier, queue name, payload, priority, run time, attempts, last error, key, lock owner, revision, flags, and availability.

curl -u "admin:${GRAPHILE_WORKER_ADMIN_PASSWORD}" \
  "http://127.0.0.1:5678/api/jobs?state=failed&limit=50"

Job Mutations

Mutating job routes are disabled when the server runs with --read-only or AdminServerConfig::read_only(true).

Adding a job uses the shared AddJobRequest contract:

{
  "identifier": "send_email",
  "payload": { "user_id": 123 },
  "queue": "mailers",
  "max_attempts": 25,
  "key": "email:123",
  "job_key_mode": "replace",
  "priority": 0,
  "flags": ["transactional"]
}

Supported job key modes are replace, preserve-run-at, and unsafe-dedupe. When job_key_mode is set, key is required.

POST /api/jobs/action accepts an action and job ids:

{
  "action": "reschedule",
  "ids": [42, 43],
  "run_at": "2026-01-01T00:00:00Z",
  "priority": 10,
  "attempts": 0,
  "max_attempts": 25
}

Supported actions are:

  • complete
  • fail
  • run-now
  • reschedule

fail uses the provided reason when present and non-empty. Otherwise it uses the admin UI default reason. reschedule requires at least one of run_at, priority, attempts, or max_attempts.

Jobs can also be removed by key:

{
  "key": "email:123"
}

Maintenance

POST /api/maintenance exposes selected WorkerUtils maintenance operations. It is also disabled in read-only mode.

Supported actions are:

  • migrate
  • cleanup
  • force-unlock
  • sweep-stale-workers

Cleanup can run all cleanup tasks by omitting cleanup_tasks, or a selected subset:

{
  "action": "cleanup",
  "cleanup_tasks": [
    "delete-permanently-failed-jobs",
    "gc-task-identifiers",
    "gc-job-queues"
  ]
}

Force unlock requires at least one worker id:

{
  "action": "force-unlock",
  "worker_ids": ["worker-1"]
}

Stale worker sweeping accepts optional thresholds in seconds and supports dry runs:

{
  "action": "sweep-stale-workers",
  "dry_run": true,
  "sweep_threshold_secs": 300,
  "recovery_delay_secs": 60
}

Assets and Client Behavior

The native crate embeds the admin stylesheet, JavaScript, WASM bindgen output, WASM module, and SVG favicon into the server binary. The browser client uses same-origin requests, sends Accept: application/json, includes credentials, and adds the CSRF header on writes.

For bearer and header auth modes, the browser client sends the token on API requests after it has been entered in the UI. For Basic auth, the browser uses same-origin credentials from the HTTP auth challenge.

Migrations

Graphile Worker RS stores its database objects in a PostgreSQL schema and tracks the installed schema revision in a migrations table inside that schema. The graphile_worker_migrations crate owns this setup and exposes the migration runner used to install or update the schema.

The public entry point is graphile_worker_migrations::migrate(database, schema). It accepts a database connection/executor value that can be converted into the crate database type, plus the schema name to manage.

use graphile_worker_migrations::migrate;

migrate(&database, "graphile_worker").await?;

What migrate does

On each run, the migration runner reads:

  • the PostgreSQL server_version_num
  • the latest row in <schema>.migrations
  • the latest row in <schema>.migrations marked as a breaking migration

If <schema>.migrations does not exist, the runner installs the base schema tracking objects first:

create schema if not exists graphile_worker;

create table if not exists graphile_worker.migrations (
    id int primary key,
    ts timestamptz default now() not null,
    breaking boolean not null default false
);

After that, it runs every packaged migration whose number is greater than the latest recorded migration id. Each migration runs in its own database transaction. When the SQL completes, the runner inserts a row into the migrations table with the migration id and whether that migration is marked as breaking, then commits the transaction.

Running migrate again after the schema is current is expected to be harmless: the runner sees the latest recorded revision and skips already-applied migrations.

Packaged revisions

Migration SQL is bundled in the graphile_worker_migrations crate under src/sql. At runtime the crate loads those files into the GRAPHILE_WORKER_MIGRATIONS registry in revision order.

The current package contains revisions 1 through 20. The revisions marked as breaking in the registry are:

1, 3, 11, 13, 14, 16

The runner supports taking over from an existing Graphile Worker schema as long as the migrations table accurately records the already-applied revision. For example, if revision 1 is already present, migrate starts at revision 2.

Startup guidance

Run migrations before starting workers that enqueue, poll, or execute jobs:

use graphile_worker_migrations::migrate;

async fn boot(database: &Database) -> Result<(), Box<dyn std::error::Error>> {
    migrate(database, "graphile_worker").await?;

    // Start worker processes only after the schema is installed and current.
    Ok(())
}

This is especially important around breaking revisions. The runner refuses to continue if the database has a newer breaking migration than the currently running package supports. For example, a database whose migrations table records a future breaking revision cannot safely be used by an older worker binary.

If the database records a future non-breaking revision, the runner logs a warning and continues. That warning means the database schema is newer than the current package knows about, so all running Graphile Worker RS versions should be checked for compatibility.

PostgreSQL version check

The migration package requires PostgreSQL 12 or newer. It checks current_setting('server_version_num') and returns an incompatible-version error for versions below 120000.

On a fresh schema install, the PostgreSQL version is checked before creating the schema and migrations table. On existing schemas, the version reported while reading migration state is checked before pending migrations are applied.

Locked jobs during migration 11

Migration 11 has a specific guard for locked jobs. If PostgreSQL returns SQL state 22012 while applying that migration, the runner returns MigrateError::LockedJobInMigration11.

Before applying a package version that still needs to run migration 11, stop workers cleanly and make sure jobs and queues are not left locked. Then rerun migrate.

Custom schemas

The schema argument controls where the Graphile Worker objects are installed:

migrate(&database, "my_worker_schema").await?;

Packaged migration SQL uses the :GRAPHILE_WORKER_SCHEMA placeholder internally. The migration executor replaces that placeholder with the escaped schema name before executing each SQL statement.

Deployment

Graphile Worker RS runs as an application process backed by PostgreSQL. A production deployment should treat the worker like any other stateful database client: size the PostgreSQL pool deliberately, run schema migrations before work starts, handle shutdown signals, and expose enough logs or hooks to understand job flow.

Database and Migrations

Each worker needs a PostgreSQL connection, either from an existing database pool or from a connection URL:

let worker = graphile_worker::WorkerOptions::default()
    .database_url(&std::env::var("DATABASE_URL")?)
    .schema("graphile_worker")
    .define_job::<SendEmail>()
    .init()
    .await?;

If you already manage your own pool, pass it to the worker instead:

let pg_pool = sqlx::postgres::PgPoolOptions::new()
    .max_connections(10)
    .connect(&std::env::var("DATABASE_URL")?)
    .await?;

let worker = graphile_worker::WorkerOptions::default()
    .pg_pool(pg_pool)
    .schema("graphile_worker")
    .define_job::<SendEmail>()
    .init()
    .await?;

WorkerOptions::init() runs the Graphile Worker migrations for the configured schema before registering task handlers and returning a runnable worker. The default schema is graphile_worker; choose a different schema when you want to isolate worker tables for an application, environment, or tenant.

The CLI can also run migrations explicitly:

graphile-worker --database-url "$DATABASE_URL" migrate

Pool and Concurrency

concurrency controls how many jobs a worker process can run at the same time. If you do not set it, the worker defaults to the number of logical CPUs on the host. Set it explicitly in production so the same artifact behaves consistently across instance sizes.

let worker = graphile_worker::WorkerOptions::default()
    .concurrency(8)
    .database_url(&database_url)
    .max_pg_conn(20)
    .define_job::<SendEmail>()
    .init()
    .await?;

When the worker creates its own pool from database_url, max_pg_conn sets the maximum number of database connections. If you pass an existing pool with pg_pool or database, the pool's own configuration is used and max_pg_conn is ignored.

Use practical sizing:

  • CPU-heavy jobs usually need concurrency near the available CPU capacity.
  • I/O-heavy jobs can often run with higher concurrency.
  • Total database connections are the sum of all worker replicas plus the rest of your application.
  • Keep enough pool capacity for polling, job updates, LISTEN/NOTIFY handling, recovery, cron scheduling, and any database work done inside job handlers.

The worker polls PostgreSQL when notifications are not enough and for scheduled jobs. The default poll interval is one second. Lower intervals can improve responsiveness but increase database load:

let worker = graphile_worker::WorkerOptions::default()
    .poll_interval(std::time::Duration::from_millis(500))
    .define_job::<SendEmail>()
    .database_url(&database_url)
    .init()
    .await?;

Worker Process

A typical production process creates the worker, then calls run():

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let worker = graphile_worker::WorkerOptions::default()
        .concurrency(8)
        .database_url(&std::env::var("DATABASE_URL")?)
        .schema("graphile_worker")
        .define_job::<SendEmail>()
        .init()
        .await?;

    worker.run().await?;
    Ok(())
}

Register every task this process is allowed to execute with define_job, define_batch_job, or define_jobs. To split work across specialized worker deployments, use job flags and add_forbidden_flag so a process skips jobs that do not belong on that deployment:

let worker = graphile_worker::WorkerOptions::default()
    .add_forbidden_flag("high_memory")
    .define_job::<SendEmail>()
    .database_url(&database_url)
    .init()
    .await?;

Jobs with the same queue name run in series for that queue. Use queue names when you need per-user, per-account, or per-resource ordering while still allowing other queues to run concurrently.

Shutdown Signals

By default, the worker listens for OS shutdown signals such as SIGINT and SIGTERM. This is the right behavior for most containers and process managers: send a shutdown signal and give the process enough grace time to finish or release in-flight work.

Tune graceful shutdown with WorkerShutdownConfig or the convenience setters:

let worker = graphile_worker::WorkerOptions::default()
    .shutdown_grace_period(std::time::Duration::from_secs(30))
    .shutdown_interrupted_job_retry_delay(std::time::Duration::from_secs(30))
    .define_job::<SendEmail>()
    .database_url(&database_url)
    .init()
    .await?;

If your host application already owns signal handling, disable the built-in OS listeners and pass a future that resolves when shutdown should begin:

let worker = graphile_worker::WorkerOptions::default()
    .listen_os_shutdown_signals(false)
    .shutdown_signal(on_shutdown())
    .define_job::<SendEmail>()
    .database_url(&database_url)
    .init()
    .await?;

Graphile Worker still handles graceful draining after the shutdown signal is received.

Recovery

Worker recovery is opt-in. Enable it when a job may be left locked after a process crash, network partition, forced abort, or orchestrator timeout. When enabled, workers heartbeat in PostgreSQL and a sweeper recovers jobs owned by workers that have stopped heartbeating.

let worker = graphile_worker::WorkerOptions::default()
    .heartbeat_interval(std::time::Duration::from_secs(30))
    .sweep_interval(std::time::Duration::from_secs(60))
    .sweep_threshold(std::time::Duration::from_secs(300))
    .recovery_delay(std::time::Duration::from_secs(30))
    .define_job::<SendEmail>()
    .database_url(&database_url)
    .init()
    .await?;

Recovery sweeps use a PostgreSQL advisory lock so only one sweeper performs the recovery work at a time. Recovered jobs are unlocked, their attempt count is decremented back, their queue lock is released, and their run_at is delayed by the configured recovery delay.

You can run the same kind of recovery manually from the CLI:

graphile-worker --database-url "$DATABASE_URL" sweep-stale-workers --dry-run
graphile-worker --database-url "$DATABASE_URL" sweep-stale-workers --sweep-threshold 5m --recovery-delay 30s

Use --dry-run first when investigating a production incident.

Cron

Cron entries are configured on the worker. Use the typed cron API when the task type is available in Rust:

use graphile_worker::{Cron, CrontabFill, WorkerOptions};

let worker = WorkerOptions::default()
    .define_job::<SendDailyReport>()
    .with_cron(
        Cron::daily_at::<SendDailyReport>(8, 0)?
            .fill(CrontabFill::hours(1)),
    )
    .database_url(&database_url)
    .init()
    .await?;

Crontab text is also supported:

let worker = graphile_worker::WorkerOptions::default()
    .define_job::<SendDailyReport>()
    .with_cron("0 8 * * * send_daily_report")?
    .database_url(&database_url)
    .init()
    .await?;

For recurring jobs, decide which deployment owns each cron definition. Running the same cron configuration in multiple independent deployments can schedule the same recurring work more than intended unless the job key and cron settings are chosen to deduplicate the workload.

Local Queue

The local queue batch-fetches jobs from PostgreSQL and serves them from a local cache. This can reduce database round trips for high-throughput deployments, at the cost of keeping jobs in a process-local cache until they are claimed, returned, or the cache TTL expires.

use graphile_worker::{LocalQueueConfig, RefetchDelayConfig, WorkerOptions};
use std::time::Duration;

let worker = WorkerOptions::default()
    .local_queue(
        LocalQueueConfig::default()
            .with_size(100)
            .with_queue_count(2)
            .with_ttl(Duration::from_secs(300))
            .with_refetch_delay(
                RefetchDelayConfig::default()
                    .with_duration(Duration::from_millis(100))
                    .with_threshold(10)
                    .with_max_abort_threshold(500),
            ),
    )
    .define_job::<SendEmail>()
    .database_url(&database_url)
    .init()
    .await?;

size applies per local queue, so total local capacity is size * queue_count. Workers configured with forbidden_flags bypass the local queue and fetch jobs directly from the database. Benchmark realistic jobs before turning this on for all production workers; the best settings depend on PostgreSQL latency, pool size, worker concurrency, job duration, and the number of replicas.

Observability

Graphile Worker emits useful events through lifecycle hooks and plugins. Use hooks for metrics, structured logs, tracing spans, validation, or custom recovery policy.

use graphile_worker::{
    HookRegistry, JobComplete, JobFail, JobStart, Plugin, WorkerShutdown, WorkerStart,
};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

#[derive(Debug)]
struct MetricsPlugin {
    jobs_started: AtomicU64,
    jobs_completed: AtomicU64,
    jobs_failed: AtomicU64,
}

impl Plugin for MetricsPlugin {
    fn register(self, hooks: &mut HookRegistry) {
        hooks.on(WorkerStart, async |ctx| {
            tracing::info!(worker_id = %ctx.worker_id, "worker started");
        });

        let jobs_started = Arc::new(self.jobs_started);
        let jobs_completed = Arc::new(self.jobs_completed);
        let jobs_failed = Arc::new(self.jobs_failed);

        {
            let jobs_started = jobs_started.clone();
            hooks.on(JobStart, move |_ctx| {
                let jobs_started = jobs_started.clone();
                async move {
                    jobs_started.fetch_add(1, Ordering::Relaxed);
                }
            });
        }

        {
            let jobs_completed = jobs_completed.clone();
            hooks.on(JobComplete, move |ctx| {
                let jobs_completed = jobs_completed.clone();
                async move {
                    jobs_completed.fetch_add(1, Ordering::Relaxed);
                    tracing::info!(duration_ms = ?ctx.duration, "job completed");
                }
            });
        }

        {
            let jobs_failed = jobs_failed.clone();
            hooks.on(JobFail, move |ctx| {
                let jobs_failed = jobs_failed.clone();
                async move {
                    jobs_failed.fetch_add(1, Ordering::Relaxed);
                    tracing::warn!(error = %ctx.error, will_retry = ctx.will_retry, "job failed");
                }
            });
        }

        hooks.on(WorkerShutdown, async |ctx| {
            tracing::info!(worker_id = %ctx.worker_id, reason = ?ctx.reason, "worker shutting down");
        });
    }
}

Register plugins at startup:

let worker = graphile_worker::WorkerOptions::default()
    .add_plugin(MetricsPlugin {
        jobs_started: AtomicU64::new(0),
        jobs_completed: AtomicU64::new(0),
        jobs_failed: AtomicU64::new(0),
    })
    .define_job::<SendEmail>()
    .database_url(&database_url)
    .init()
    .await?;

For operational inspection, the CLI can list jobs and inspect worker state:

graphile-worker --database-url "$DATABASE_URL" list --state ready
graphile-worker --database-url "$DATABASE_URL" stats
graphile-worker --database-url "$DATABASE_URL" queues
graphile-worker --database-url "$DATABASE_URL" workers

Deployment Checklist

  • Provide DATABASE_URL or a configured PostgreSQL pool.
  • Pick a schema and run migrations with init() or the CLI migrate command.
  • Set concurrency explicitly for predictable behavior across instance sizes.
  • Size database pools for all replicas and job-handler database work.
  • Ensure SIGTERM reaches the worker process and the orchestrator grace period is longer than the worker grace period.
  • Enable recovery when forced exits or node failures are part of your failure model.
  • Assign cron ownership deliberately.
  • Turn on local queue only after measuring throughput and latency with realistic jobs.
  • Add lifecycle hooks or plugins for logs, metrics, and failure visibility.

Observability

Graphile Worker RS exposes observability through standard Rust tracing, optional OpenTelemetry compatibility features, and lifecycle hooks. The worker does not force a logging or metrics backend on applications; install the subscriber, exporter, and hook plugin that match your runtime.

Tracing

The main crate depends on tracing, so worker internals and application task handlers can emit spans and events into the subscriber you install. The hooks example uses tracing-subscriber with an environment filter and a formatted output layer:

use tracing_subscriber::{
    filter::EnvFilter, prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt,
};

pub fn enable_logs() {
    let fmt_layer = tracing_subscriber::fmt::layer();
    let filter_layer = EnvFilter::try_new("debug,sqlx=warn").unwrap();

    tracing_subscriber::registry()
        .with(filter_layer)
        .with(fmt_layer)
        .init();
}

Initialize the subscriber once, before starting the worker. The example keeps SQLx noise at warn while enabling debug logging elsewhere.

OpenTelemetry compatibility

OpenTelemetry support is feature-gated. Enable exactly one compatibility feature:

[dependencies]
graphile_worker = {
    version = "0.13",
    features = ["opentelemetry_0_32"]
}

Available compatibility features are:

FeatureOpenTelemetry crate version
opentelemetry_0_30opentelemetry 0.30
opentelemetry_0_31opentelemetry 0.31
opentelemetry_0_32opentelemetry 0.32

The crate rejects builds that enable more than one of these features at the same time.

When an OpenTelemetry feature is enabled, job insertion code can capture the current span context and write it into the job payload under _trace. Object payloads receive the field directly. Array payloads receive it on each object item in the array, while scalar values are left unchanged.

{
  "user_id": 42,
  "_trace": {
    "flags": 1,
    "trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
    "span_id": "00f067aa0ba902b7"
  }
}

When the worker later runs a job, it reads _trace from object payloads and adds the recorded span context as a link to the current span. Invalid, non-object, or missing trace payloads are ignored. Without an OpenTelemetry feature, this trace extraction and linking path is a no-op.

Metrics with lifecycle hooks

Lifecycle hooks are the extension point for metrics and structured operational events. A plugin registers callbacks on HookRegistry; those callbacks receive context objects for worker and job events.

The metrics example counts started, completed, and failed jobs with atomics:

use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};

use graphile_worker::{
    HookRegistry, JobComplete, JobFail, JobStart, Plugin, WorkerShutdown, WorkerStart,
};

pub struct MetricsPlugin {
    jobs_started: AtomicU64,
    jobs_completed: AtomicU64,
    jobs_failed: AtomicU64,
}

impl Plugin for MetricsPlugin {
    fn register(self, hooks: &mut HookRegistry) {
        hooks.on(WorkerStart, async |ctx| {
            println!("[MetricsPlugin] Worker {} started", ctx.worker_id);
        });

        let jobs_started = Arc::new(self.jobs_started);
        let jobs_completed = Arc::new(self.jobs_completed);
        let jobs_failed = Arc::new(self.jobs_failed);

        {
            let jobs_started = jobs_started.clone();
            hooks.on(JobStart, move |ctx| {
                let jobs_started = jobs_started.clone();
                async move {
                    jobs_started.fetch_add(1, Ordering::Relaxed);
                    println!(
                        "[MetricsPlugin] Job {} started (task: {})",
                        ctx.job.id(),
                        ctx.job.task_identifier()
                    );
                }
            });
        }

        {
            let jobs_completed = jobs_completed.clone();
            hooks.on(JobComplete, move |ctx| {
                let jobs_completed = jobs_completed.clone();
                async move {
                    jobs_completed.fetch_add(1, Ordering::Relaxed);
                    println!(
                        "[MetricsPlugin] Job {} completed in {:?}",
                        ctx.job.id(),
                        ctx.duration
                    );
                }
            });
        }

        {
            let jobs_failed = jobs_failed.clone();
            hooks.on(JobFail, move |ctx| {
                let jobs_failed = jobs_failed.clone();
                async move {
                    jobs_failed.fetch_add(1, Ordering::Relaxed);
                    println!(
                        "[MetricsPlugin] Job {} failed: {} (will_retry: {})",
                        ctx.job.id(),
                        ctx.error,
                        ctx.will_retry
                    );
                }
            });
        }

        hooks.on(WorkerShutdown, move |ctx| async move {
            println!(
                "[MetricsPlugin] Worker {} shutting down (reason: {:?})",
                ctx.worker_id, ctx.reason
            );
        });
    }
}

Use the same hook points to export counters, histograms, or structured events to your metrics backend:

HookUseful signal
WorkerStartworker process started and has a worker id
WorkerShutdownworker process is stopping and exposes the shutdown reason
JobStartjob id and task identifier started executing
JobCompletejob completed and exposes execution duration
JobFailjob failed, exposes the error and whether it will retry

Logging

For application logs, combine a subscriber with hook callbacks and task-handler instrumentation. The hook contexts expose job ids, task identifiers, errors, durations, worker ids, and shutdown reasons, which are the stable values to put into operational logs.

For example, a JobFail hook can log the job id, error, and retry decision; JobComplete can log duration; WorkerShutdown can log the shutdown reason. The example code prints these values with println!, but production code will usually emit tracing events or send them to a metrics/logging client.

Admin and monitor visibility

The workspace includes graphile_worker_admin_api, graphile_worker_admin_ui, and graphile_worker_admin_ui_client crates. Use those admin surfaces for queue visibility, and use hooks and tracing for process-local signals that admin views cannot infer on their own, such as per-process shutdown reasons, local counters, and span links.

For runtime monitoring, the most useful signals visible from the current public hooks are:

SignalSource
Worker startedWorkerStart hook
Worker shutdown reasonWorkerShutdown hook
Job throughputJobStart and JobComplete hooks
Job failure rateJobFail hook
Job durationJobComplete hook
Trace continuity from enqueue to execution_trace payload plus OpenTelemetry span links

Reference

Use this section when you already know the concept you need and want exact package, feature, API, compatibility, or troubleshooting details.

The guide pages explain how to build common workflows. The reference pages are for looking up the supported surface area and checking which crate, feature flag, or documentation source applies to your use case.

docs.rs and this guide

The generated Rust API documentation is published on docs.rs. Use docs.rs when you need item-level details for public Rust types, traits, builders, and re-exports from the graphile_worker crate.

Use this mdBook guide when you need operational or architectural context:

For example, look up the exact methods and trait bounds on docs.rs, then return to the guide for the surrounding setup:

use graphile_worker::{TaskHandler, WorkerOptions};

// Check docs.rs for the exact public API of each type.
// Check the guide for configuration examples and operational context.

Public crate surface

The top-level graphile_worker crate re-exports the main types used by applications:

  • Worker for running workers.
  • WorkerOptions, CronInput, and WorkerBuildError for configuration.
  • WorkerUtils for job management helpers.
  • TaskHandler and related task handling types.
  • Job and job specification types from the job crates.
  • Context, database, lifecycle hook, shutdown signal, and cron types.
  • LocalQueue and its configuration and error types.
  • Worker recovery types such as WorkerRecoveryConfig and SweepStaleWorkersOptions.

The workspace also contains smaller crates for specific responsibilities. See Crate Map when you need to know which package owns a type or feature area.

Feature flags

The default graphile_worker feature set enables:

default = ["runtime-tokio", "tls-rustls", "driver-sqlx"]

The visible feature groups are:

  • Runtime: runtime-tokio, runtime-async-std.
  • TLS: tls-rustls, tls-native-tls.
  • Database driver: driver-sqlx, driver-tokio-postgres.
  • OpenTelemetry compatibility: opentelemetry_0_30, opentelemetry_0_31, opentelemetry_0_32.

Start with the defaults unless you have a specific runtime, TLS, database driver, or OpenTelemetry version requirement. See Feature Flags and Runtime, TLS, and Drivers before changing them.

Compatibility checklist

Before changing a dependency or feature set, check the compatibility page for the constraints that matter across the workspace:

  • Runtime feature enabled.
  • Database driver feature enabled.
  • TLS backend enabled.
  • OpenTelemetry compatibility feature matching your tracing stack.

See Compatibility for the reference checklist.

Troubleshooting entry points

For setup or runtime issues, start with the page closest to the symptom:

When the issue is about a specific Rust type or trait, use the API Reference page as the bridge to docs.rs.

Feature Flags

The graphile_worker crate uses feature flags to select the async runtime, TLS implementation, PostgreSQL driver, and optional OpenTelemetry compatibility version.

Default features enable the most common setup:

[dependencies]
graphile_worker = "0.13"

This is equivalent to enabling:

[dependencies]
graphile_worker = {
  version = "0.13",
  features = ["runtime-tokio", "tls-rustls", "driver-sqlx"],
}

Defaults

The default feature set is:

  • runtime-tokio
  • tls-rustls
  • driver-sqlx

Use default-features = false when you want to choose a different runtime, driver, or TLS backend.

[dependencies]
graphile_worker = {
  version = "0.13",
  default-features = false,
  features = ["runtime-async-std", "tls-rustls", "driver-sqlx"],
}

Runtime Features

Runtime features select the async runtime used by the root crate and its internal crates.

FeaturePurpose
runtime-tokioEnables Tokio runtime support. This is enabled by default.
runtime-async-stdEnables async-std runtime support.

The driver-tokio-postgres feature also enables runtime-tokio, so that driver is a Tokio-only combination in the root crate feature graph.

TLS Features

TLS features select the TLS implementation forwarded to database-related crates and, when SQLx is enabled, to SQLx.

FeaturePurpose
tls-rustlsEnables rustls TLS support. This is enabled by default.
tls-native-tlsEnables native-tls support.

The root feature list does not force exactly one TLS backend. Pick the one you intend to use instead of enabling both accidentally.

Database Driver Features

Driver features select the PostgreSQL access layer used by database-related crates.

FeaturePurpose
driver-sqlxEnables SQLx support and the optional sqlx dependency. This is enabled by default.
driver-tokio-postgresEnables tokio-postgres support and also enables runtime-tokio.

SQLx is the default driver. The tokio-postgres driver is available as an alternative Tokio-based driver.

OpenTelemetry Features

OpenTelemetry support is versioned so downstream applications can choose the compatibility line they use.

FeatureDependencies enabled
opentelemetry_0_30opentelemetry 0.30 and matching tracing-opentelemetry support
opentelemetry_0_31opentelemetry 0.31 and matching tracing-opentelemetry support
opentelemetry_0_32opentelemetry 0.32 and matching tracing-opentelemetry support

These features are not enabled by default. Enable the one that matches the OpenTelemetry version used by the rest of your application.

[dependencies]
graphile_worker = {
  version = "0.13",
  features = ["opentelemetry_0_32"],
}

Common Combinations

Default Tokio, rustls, SQLx

Use the default dependency declaration when you want Tokio, rustls, and SQLx.

[dependencies]
graphile_worker = "0.13"

Tokio, native-tls, SQLx

Disable defaults and select the TLS backend explicitly.

[dependencies]
graphile_worker = {
  version = "0.13",
  default-features = false,
  features = ["runtime-tokio", "tls-native-tls", "driver-sqlx"],
}

async-std, rustls, SQLx

Use the async-std runtime with the SQLx driver.

[dependencies]
graphile_worker = {
  version = "0.13",
  default-features = false,
  features = ["runtime-async-std", "tls-rustls", "driver-sqlx"],
}

Tokio, tokio-postgres

The tokio-postgres driver enables runtime-tokio through the feature graph.

[dependencies]
graphile_worker = {
  version = "0.13",
  default-features = false,
  features = ["driver-tokio-postgres"],
}

If your connection setup needs TLS, also select the TLS feature you intend to use:

[dependencies]
graphile_worker = {
  version = "0.13",
  default-features = false,
  features = ["driver-tokio-postgres", "tls-rustls"],
}

Tokio, SQLx, OpenTelemetry 0.32

OpenTelemetry features can be added to the default feature set.

[dependencies]
graphile_worker = {
  version = "0.13",
  features = ["opentelemetry_0_32"],
}

Feature Matrix Used by Local Checks

The repository's runtime matrix checks the following combinations:

just test-docker-all-matrices

That command covers:

RuntimeDriverTLS
runtime-tokiodriver-sqlxtls-rustls
runtime-async-stddriver-sqlxtls-rustls
runtime-tokiodriver-tokio-postgresnot selected by the test recipe

The test-runtime recipe rejects driver-tokio-postgres with runtime-async-std, matching the feature relationship where driver-tokio-postgres enables Tokio.

Cautions

Disable defaults when replacing a default feature. For example, adding tls-native-tls without default-features = false leaves tls-rustls enabled too.

[dependencies]
graphile_worker = {
  version = "0.13",
  default-features = false,
  features = ["runtime-tokio", "tls-native-tls", "driver-sqlx"],
}

Choose one database driver unless you have checked that your application really needs both. The default driver is SQLx, so enabling driver-tokio-postgres without disabling defaults enables both drivers.

[dependencies]
graphile_worker = {
  version = "0.13",
  default-features = false,
  features = ["driver-tokio-postgres"],
}

Use an OpenTelemetry feature only when your dependency graph uses that matching OpenTelemetry line. The root crate exposes separate compatibility features for 0.30, 0.31, and 0.32.

Illustrative Rust examples in this documentation may require additional setup, such as task handlers, worker options, and a database URL. See the quick start for a complete first worker.

// Feature selection happens in Cargo.toml, not in Rust source.
use graphile_worker::WorkerOptions;

Crate Map

Graphile Worker RS is published as a main application crate plus smaller crates that hold reusable pieces of the worker, cron, migration, admin, and database layers. Most applications should depend on graphile_worker; the other crates are useful when you need a narrower API surface, are building tooling around the worker schema, or are working on Graphile Worker RS itself.

Main User-Facing Crate

CrateUse it for
graphile_workerRunning workers, adding jobs, defining tasks, configuring cron, applying migrations, and using the public worker API from one dependency.

The root crate re-exports the core types that applications normally need:

  • Worker and WorkerOptions for worker setup.
  • WorkerUtils for job management.
  • TaskHandler and related task handler types.
  • JobSpec and job data types.
  • Cron, crontab parser and crontab value types.
  • Lifecycle hook, context, database, and shutdown signal types.
  • Dead worker recovery types such as SweepStaleWorkersOptions.

For a normal worker binary, start here:

use graphile_worker::WorkerOptions;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let worker = WorkerOptions::default()
        .database_url("postgres://postgres:postgres@localhost/postgres")
        .define_job::<SendEmail>()
        .init()
        .await?;

    worker.run().await?;
    Ok(())
}

For job management without running a worker loop, use WorkerUtils from the same crate:

use graphile_worker::{JobSpec, WorkerUtils};

async fn enqueue(utils: &WorkerUtils) -> graphile_worker::errors::Result<()> {
    utils
        .add_raw_job(
            "send_email",
            serde_json::json!({ "to": "user@example.com" }),
            JobSpec::builder().queue_name("mailers").build(),
        )
        .await?;

    Ok(())
}

User-Facing Building Blocks

These crates provide the types application code is most likely to see through graphile_worker re-exports. Depend on them directly only when you are sharing small interfaces across crates and do not want the full worker dependency.

CrateWhat it contains
graphile_worker_task_handlerTaskHandler and reusable JobDefinition values for defining job handlers.
graphile_worker_job_specJob options such as queue name, run time, max attempts, priority, and deduplication keys.
graphile_worker_jobJob row/data types used by handlers, hooks, queries, and utilities.
graphile_worker_ctxWorker context passed through job execution.
graphile_worker_extensionsExtension storage used to attach and retrieve shared application state.
graphile_worker_lifecycle_hooksHook registry and lifecycle events around worker startup, job execution, completion, and cron ticks.
graphile_worker_shutdown_signalCross-platform shutdown signal handling for graceful worker shutdown.

For example, a helper crate that only defines task handlers can depend on the task handler and context crates instead of depending on the whole worker:

use graphile_worker_ctx::WorkerContext;
use graphile_worker_task_handler::{IntoTaskHandlerResult, TaskHandler};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct SendEmail {
    to: String,
}

impl TaskHandler for SendEmail {
    const IDENTIFIER: &'static str = "send_email";

    async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        let _ = (self.to, ctx);
        Ok::<(), String>(())
    }
}

Cron Crates

Cron support is split into parser, shared types, and runner pieces.

CrateWhat it contains
graphile_worker_crontab_typesCrontab value types and utilities, including timers, fields, fills, and job key mode types.
graphile_worker_crontab_parserParser for crontab configuration.
graphile_worker_crontab_runnerRunner that schedules crontab entries against the worker database and lifecycle hooks.

Applications usually get these through graphile_worker, which re-exports parse_crontab, Cron, CronBuilder, Crontab, CrontabTimer, and related types.

Support Crates

These crates are useful for tools, integrations, and advanced applications that need to manage jobs or schema state without pulling every top-level worker API into their own public surface.

CrateWhat it contains
graphile_worker_utilsJob management helpers used by WorkerUtils, including adding, removing, rescheduling, and other worker utility operations.
graphile_worker_queriesDatabase query helpers for Graphile Worker job and worker operations.
graphile_worker_recoveryDead worker recovery helpers, including stale worker sweeping and recovery configuration.
graphile_worker_migrationsDatabase migrations for the Graphile Worker schema.

The root crate's documentation describes the managed schema as using the graphile_worker schema by default, with private jobs, tasks, job queues, and workers tables. These support crates are the lower-level pieces used to manage that schema and operate on its rows.

Internal-ish Infrastructure Crates

These crates are part of the workspace API surface, but most applications should not need to depend on them directly.

CrateWhat it contains
graphile_worker_databaseDatabase driver abstraction used by worker, migrations, queries, recovery, cron, and utilities. It has feature support for sqlx and tokio-postgres drivers.
graphile_worker_runtimeAsync runtime compatibility helpers for Tokio and async-std feature combinations.
graphile_worker_task_detailsShared mapping between task IDs and task identifiers.
graphile_worker_migrations_coreCore migration types used by the migrations crate.
graphile_worker_migrations_macrosProcedural macros used by the migrations crate.

These crates exist to keep feature flags and dependencies focused. For example, database-facing crates share the driver-sqlx, driver-tokio-postgres, runtime-tokio, runtime-async-std, tls-rustls, and tls-native-tls feature families instead of hard-coding one driver and runtime everywhere.

Admin And CLI Crates

The workspace also includes crates for operational tools around a Graphile Worker installation.

CrateWhat it contains
graphile_worker_cliThe graphile-worker binary for migrations, job management, admin server startup, stats, queues, workers, cleanup, and stale worker sweeping.
graphile_worker_admin_apiShared admin API contracts and queries.
graphile_worker_admin_uiEmbedded Leptos admin UI and server-side assets.
graphile_worker_admin_ui_clientLeptos WASM client for the embedded admin UI.

The CLI binary is installed as graphile-worker and connects with --database-url or DATABASE_URL. Its README lists commands such as:

graphile-worker migrate
graphile-worker add send_email --payload '{"to":"user@example.com"}'
graphile-worker list --state ready
graphile-worker complete 123 124
graphile-worker fail 125 --reason "invalid payload"
graphile-worker reschedule 126 --run-at 2026-01-02T03:04:05Z
graphile-worker sweep-stale-workers --dry-run

Choosing A Dependency

Use graphile_worker unless you have a concrete reason not to. It is the stable entry point for application code and re-exports the main worker, task, job, cron, migration, hook, database, recovery, and utility types.

Use a smaller crate directly when:

  • A library crate only needs to expose task handler types.
  • A tool only needs job specs, job row types, or query helpers.
  • An integration needs admin API contracts without embedding the UI.
  • You are extending Graphile Worker RS itself and need the same internal boundaries as the workspace.

API Reference

This page is a map of the public Rust API. For complete signatures, trait bounds, and generated item documentation, use the docs.rs links in each section.

Most applications should start with the root crate:

  • graphile_worker - worker setup, task registration, job scheduling, cron helpers, lifecycle hooks, shutdown, recovery, and the most commonly used re-exports.

Worker Setup

Use WorkerOptions to configure and initialize a worker, then call Worker::run or Worker::run_once.

Important public types:

  • WorkerOptions - builder-style worker configuration.
  • WorkerBuildError - initialization errors.
  • Worker - initialized worker runtime.
  • WorkerShutdownConfig - graceful shutdown behavior.
  • ShutdownSignal - shareable shutdown future.

Common configuration methods include:

  • database_url, database, and pg_pool for database connections.
  • schema, concurrency, poll_interval, max_pg_conn, and use_local_time.
  • define_job, define_batch_job, and define_jobs for task registration.
  • add_forbidden_flag for workers that skip jobs with specific flags.
  • local_queue, complete_job_batch_delay, and fail_job_batch_delay for throughput tuning.
  • worker_recovery, heartbeat_interval, sweep_interval, sweep_threshold, and recovery_delay for dead worker recovery.
  • listen_os_shutdown_signals, shutdown_signal, shutdown_grace_period, and shutdown_interrupted_job_retry_delay for shutdown control.
use graphile_worker::{
    IntoTaskHandlerResult, TaskHandler, WorkerContext, WorkerOptions,
};
use serde::{Deserialize, Serialize};
use std::time::Duration;

#[derive(Deserialize, Serialize)]
struct SendEmail {
    to: String,
}

impl TaskHandler for SendEmail {
    const IDENTIFIER: &'static str = "send_email";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        Ok::<(), String>(())
    }
}

let worker = WorkerOptions::default()
    .database_url("postgres://postgres:postgres@localhost/postgres")
    .schema("graphile_worker")
    .concurrency(8)
    .poll_interval(Duration::from_millis(500))
    .define_job::<SendEmail>()
    .init()
    .await?;

worker.run().await?;

Task Handlers

Task handler APIs live in graphile_worker_task_handler and are re-exported by graphile_worker.

Important public types:

  • TaskHandler - trait for a single job payload type.
  • IntoTaskHandlerResult - conversion trait for task return values such as () and Result<(), E>.
  • BatchTaskHandler - trait for JSON-array batch payloads.
  • BatchTaskResult and IntoBatchTaskHandlerResult - batch completion and partial failure results.
  • JobDefinition - reusable task registration value.
  • TaskHandlerOutcome and TaskHandlerFn - type-erased handler output and function type.
  • run_task_from_worker_ctx - helper that deserializes and runs a TaskHandler from WorkerContext.

Batch handlers operate on Vec<Self> item payloads. BatchTaskResult::ItemResults must return one result for each input item, in the same order; failed items are retried as a replacement JSON-array payload.

use graphile_worker::{
    BatchTaskHandler, IntoBatchTaskHandlerResult, WorkerContext, WorkerOptions,
};
use serde::{Deserialize, Serialize};

#[derive(Clone, Deserialize, Serialize)]
struct IndexRecord {
    id: i64,
}

impl BatchTaskHandler for IndexRecord {
    const IDENTIFIER: &'static str = "index_record";

    async fn run_batch(
        items: Vec<Self>,
        _ctx: WorkerContext,
    ) -> impl IntoBatchTaskHandlerResult {
        let results = items
            .into_iter()
            .map(|item| {
                if item.id > 0 {
                    Ok(())
                } else {
                    Err("invalid id")
                }
            })
            .collect::<Vec<_>>();

        results
    }
}

let options = WorkerOptions::default().define_batch_job::<IndexRecord>();

Job Scheduling And Management

Use WorkerUtils to add, update, and maintain jobs without running a worker loop in the same code path.

Important public types:

  • WorkerUtils - client for scheduling and maintenance operations.
  • JobSpec and JobSpecBuilder - optional job parameters.
  • JobKeyMode - job-key behavior: Replace, PreserveRunAt, or UnsafeDedupe.
  • RawJobSpec - raw batch scheduling input.
  • Job, JobBuilder, DbJob, and DbJobData - job records returned by scheduling and management APIs.
  • CleanupTask - cleanup operations from graphile_worker_utils::types.
  • RescheduleJobOptions - optional fields for rescheduling jobs.

Common WorkerUtils methods include:

  • add_job, add_raw_job, add_jobs, add_raw_jobs, and add_batch_job.
  • remove_job, complete_jobs, permanently_fail_jobs, and reschedule_jobs.
  • list_active_workers, sweep_stale_workers, sweep_stale_workers_with_config, and force_unlock_workers.
  • cleanup and migrate.
use graphile_worker::{JobKeyMode, JobSpec, WorkerUtils};
use serde_json::json;

let spec = JobSpec::builder()
    .queue_name("email")
    .job_key("welcome-email:user-123")
    .job_key_mode(JobKeyMode::PreserveRunAt)
    .max_attempts(5)
    .priority(-10)
    .build();

let job = utils
    .add_raw_job(
        "send_email",
        json!({ "to": "user@example.com" }),
        spec,
    )
    .await?;

Worker Context And Extensions

Context APIs live in graphile_worker_ctx and are re-exported by graphile_worker.

Important public types:

  • WorkerContext - runtime context passed to task handlers.
  • WorkerContextBuilder - builder for creating contexts.
  • WorkerContextExt - root-crate extension helpers for worker context.
  • TaskDetails and SharedTaskDetails - task identifier mappings.
  • Extensions and ReadOnlyExtensions from graphile_worker_extensions - typed application state storage.

Add shared state with WorkerOptions::add_extension. Task handlers receive a WorkerContext and can use the context APIs exposed by the generated docs.

Cron

Cron support is exposed from the root crate and split across three crates:

The root crate also re-exports:

  • Cron and CronBuilder for typed schedules.
  • CronInput for values accepted by WorkerOptions::with_cron.
  • CronJobKeyMode, which is the crontab JobKeyMode re-exported under a distinct root-crate name.
use graphile_worker::{Cron, CrontabFill, WorkerOptions};

let options = WorkerOptions::default()
    .define_job::<SendEmail>()
    .with_cron(
        Cron::daily_at::<SendEmail>(8, 0)?
            .fill(CrontabFill::hours(1)),
    );

let options = WorkerOptions::default()
    .with_cron("0 8 * * * send_email")?;

Database And Migrations

The database abstraction lives in graphile_worker_database.

Important public types:

  • Database, DatabaseDriver, DbTransaction, and TransactionDriver.
  • DbExecutor and DbExecutorArg.
  • DbError.
  • Schema.
  • Notification and NotificationStream.
  • DbCell, DbParams, DbRow, DbValue, and FromDbCell.
  • escape_identifier.

Driver modules are feature gated:

  • graphile_worker_database::sqlx with the driver-sqlx feature.
  • graphile_worker_database::tokio_postgres with the driver-tokio-postgres feature.

Migrations live in graphile_worker_migrations.

Important public items:

  • migrate - runs Graphile Worker schema migrations.
  • MigrateError - migration error type.
  • GraphileWorkerMigration - migration metadata type.
  • pg_version and sql modules.

The migration support crates are also public:

Lifecycle Hooks

Lifecycle hook APIs live in graphile_worker_lifecycle_hooks and are re-exported by graphile_worker.

Important public types:

  • HookRegistry - registry for hook handlers.
  • Plugin - plugin trait for registering hooks.
  • Event, HookOutput, and Interceptable - hook event traits.
  • TypeErasedHooks - erased hook registry.
  • HookResult and event-specific result types such as JobScheduleResult.
  • Event marker and context types exported from the crate's events and context modules.

Workers can register individual handlers with WorkerOptions::on or register a plugin with WorkerOptions::add_plugin.

use graphile_worker::{HookResult, JobStart, WorkerOptions};

let options = WorkerOptions::default()
    .on(JobStart, |ctx| async move {
        println!("starting job {}", ctx.job.id());
    })
    .on(graphile_worker::BeforeJobRun, |ctx| async move {
        if ctx.payload.get("skip").and_then(|value| value.as_bool()) == Some(true) {
            return HookResult::Skip;
        }

        HookResult::Continue
    });

Recovery And Shutdown

Recovery APIs are exposed from graphile_worker_recovery and re-exported by the root crate where they are part of the worker API.

Important public types:

  • WorkerRecoveryConfig - dead worker recovery settings.
  • SweepStaleWorkersOptions and SweepStaleWorkersResult - manual sweep input and output.
  • ResolvedSweepConfig - resolved sweep settings.
  • ActiveWorkerRow - heartbeat row returned by worker listing APIs.
  • INFRASTRUCTURE_RESILIENT_FLAG and job_has_resilient_flag - infrastructure-resilient job flag support.

Shutdown signal support lives in graphile_worker_shutdown_signal:

  • ShutdownSignal - cloneable shared future.
  • shutdown_signal - OS shutdown signal detector.

Runtime

graphile_worker_runtime provides runtime-neutral async building blocks used by the worker internals and exposed as a public crate.

Important public items:

  • channel, Receiver, and Sender.
  • Mutex, RwLock, RwLockReadGuard, and RwLockWriteGuard.
  • Notify and Notified.
  • spawn, AbortHandle, JoinHandle, and JoinError.
  • interval, sleep, sleep_until, timeout_at, Interval, and TimeoutError.

The crate requires either the runtime-tokio or runtime-async-std feature.

Admin Crates

Admin APIs are published as separate crates:

Feature Flags

The root crate default feature set is:

default = ["runtime-tokio", "tls-rustls", "driver-sqlx"]

Runtime features:

  • runtime-tokio
  • runtime-async-std

TLS features:

  • tls-rustls
  • tls-native-tls

Database driver features:

  • driver-sqlx
  • driver-tokio-postgres

OpenTelemetry compatibility features:

  • opentelemetry_0_30
  • opentelemetry_0_31
  • opentelemetry_0_32

Lower-Level Crates

These crates are public for advanced integration and internal composition, but most applications use them through graphile_worker re-exports:

Compatibility

Graphile Worker RS is a Rust implementation of the PostgreSQL-backed job queue used by Node Graphile Worker. It is designed to use the same database schema and can run side by side with Node workers when both runtimes understand the schema revision installed in PostgreSQL.

This page focuses on database and operational compatibility. Rust task handler code is not portable to Node.js, and Node task functions are not portable to Rust; the compatibility boundary is the jobs stored in PostgreSQL.

What Is Shared

Graphile Worker RS manages a PostgreSQL schema, graphile_worker by default. The schema stores queued jobs, task identifiers, queue locks, migration state, and optional worker recovery state.

The current migrations install the same public-facing objects in the graphile_worker schema that applications typically interact with:

  • add_job(...) schedules one job.
  • add_jobs(...) schedules many jobs.
  • remove_job(...) removes a keyed job.
  • complete_jobs(...), reschedule_jobs(...), permanently_fail_jobs(...), and force_unlock_workers(...) administer jobs.
  • jobs is a compatibility view over the private job tables.
  • migrations is the migration ledger.

Internally, newer migrations store queue data in private tables such as _private_jobs, _private_tasks, and _private_job_queues. Application code should prefer the public SQL functions and the jobs view instead of depending on the private tables directly.

Coexisting With Node Workers

Rust and Node workers can process jobs from the same schema when they register handlers for the same task identifiers and agree on the payload shape.

For example, a job scheduled through SQL can be picked up by any compatible worker that has a handler for send_email:

select graphile_worker.add_job(
    identifier => 'send_email',
    payload => '{"to":"ada@example.com","subject":"Welcome"}'::json,
    queue_name => 'mailers',
    run_at => now(),
    max_attempts => 25,
    job_key => 'welcome:ada@example.com',
    priority => 0,
    flags => null,
    job_key_mode => 'replace'
);

The matching Rust handler must use the same task identifier:

use graphile_worker::{IntoTaskHandlerResult, TaskHandler, WorkerContext};
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize)]
struct SendEmail {
    to: String,
    subject: String,
}

impl TaskHandler for SendEmail {
    const IDENTIFIER: &'static str = "send_email";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        Ok::<(), String>(())
    }
}

The job payload is stored as JSON. Keep payloads language-neutral: use stable field names, avoid Rust-only serialization assumptions, and make nullable or optional fields explicit when both Rust and Node workers may consume the same task.

Schema Selection

The default schema is graphile_worker, but Graphile Worker RS can be pointed at another schema during worker setup:

let worker = graphile_worker::WorkerOptions::default()
    .schema("graphile_worker")
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

Use the same schema name when you want Rust and Node workers to share a queue. Use separate schemas when you want isolation, for example during a staged migration or when two applications have unrelated task identifiers.

Important Differences

Graphile Worker RS is mostly compatible with Node Graphile Worker, but it is not the same process model or API surface.

The README documents one runtime-level difference: Node Graphile Worker gives each process its own worker_id; Graphile Worker RS uses one worker id and processes jobs within the async runtime according to the configured concurrency.

The Rust crate exposes Rust-specific APIs such as WorkerOptions, TaskHandler, WorkerUtils, runtime feature selection, typed job definitions, and optional recovery configuration. Those are Rust conveniences around the shared database model, not Node APIs.

Graphile Worker RS also adds recovery support in the current schema through _private_workers and SQL functions such as worker_heartbeat, worker_deregister, list_stale_workers, list_orphan_locked_workers, recover_dead_worker_jobs, and delete_stale_workers. Recovery is opt-in in the Rust worker configuration. If you run mixed Rust and Node workers, enable and test recovery behavior deliberately so that every worker process in the shared schema is compatible with your operational expectations.

Migration Behavior

Graphile Worker RS runs ordered migrations and records them in graphile_worker.migrations. The migration set is idempotent: tests install a fresh schema, add a job, rerun migrations multiple times, and verify that the job remains present.

The migration code can also take over an existing schema that already has a Graphile Worker-style migrations table. Tests cover a pre-existing migrations table containing migration 1, then run the Rust migrations, add a job, and rerun migrations without losing that job.

There are two compatibility checks to plan for:

  • If the database contains a breaking migration id newer than this Rust crate knows about, migration aborts instead of downgrading or guessing. Newer non-breaking migration ids log a warning and continue.
  • Migration 11 refuses to run while recent jobs are locked. Tests set locked_at and locked_by on an existing job and expect the migration to fail with a locked-job error.

For production coexistence, treat schema upgrades as a coordinated operation:

  1. Check that every Rust and Node worker version you plan to run supports the target database schema revision.
  2. Drain or stop workers before migrations that rewrite job tables or private structures.
  3. Confirm there are no active locks before crossing migration 11.
  4. Start one runtime first, schedule a simple job, and verify the other runtime can see or process jobs with the same task identifier before moving real traffic.

Migration Strategies

For gradual migration from Node to Rust, prefer one of these approaches:

  • Shared schema: keep Node workers running, add Rust workers with handlers for a small set of task identifiers, and move task ownership incrementally.
  • Separate schema: run Rust workers in a different schema while validating new task implementations, then move scheduling code to the shared schema when the payload and retry behavior are proven.
  • SQL scheduling boundary: schedule jobs through graphile_worker.add_job or add_jobs from application code so the producer does not depend on either runtime's client API.

When sharing a schema, avoid registering different behavior for the same task identifier in both runtimes unless the handlers are intentionally equivalent. PostgreSQL stores the task identifier, not the language runtime that should own the job.

Related pages:

Troubleshooting

This page collects common setup, runtime, and documentation issues with concrete checks you can run before digging into application-specific code.

Database Connection Problems

Graphile Worker RS needs a PostgreSQL connection. Most examples use a SQLx pool and most local checks use DATABASE_URL.

Check that the URL is set and points at PostgreSQL:

echo "$DATABASE_URL"

For a local throwaway database that matches the test setup, the repository uses PostgreSQL on port 54233:

docker run -d --name graphile-worker-rs-test \
  -p 54233:5432 \
  -e POSTGRES_PASSWORD=postgres \
  -e POSTGRES_USER=postgres \
  -e POSTGRES_DB=postgres \
  postgres

Then use:

export DATABASE_URL='postgres://postgres:postgres@localhost:54233/postgres'

If a command cannot connect, verify that the container is ready:

docker exec graphile-worker-rs-test pg_isready -U postgres -h localhost

Migrations Have Not Run

If jobs are not visible, worker startup fails, or CLI commands fail against an empty database, run the Graphile Worker migrations for the target database.

Using the CLI:

graphile-worker --database-url postgres://postgres:postgres@localhost/postgres migrate

Or with DATABASE_URL:

DATABASE_URL=postgres://postgres:postgres@localhost/postgres graphile-worker migrate

By default, the CLI uses the graphile_worker schema. See Migrations and CLI for more operational detail.

Runtime Or Feature Mismatches

The default crate features are Tokio, rustls TLS, and the SQLx driver:

graphile_worker = { version = "0.13" }

If you disable default features, choose one runtime and one supported driver. For async-std with SQLx, include the async-std runtime feature and a TLS feature:

graphile_worker = { version = "0.13", default-features = false, features = ["runtime-async-std", "driver-sqlx", "tls-rustls"] }
async-std = { version = "1", features = ["attributes"] }

For Tokio with the tokio-postgres driver:

graphile_worker = { version = "0.13", default-features = false, features = ["runtime-tokio", "driver-tokio-postgres"] }

The repository test matrix only runs driver-tokio-postgres with runtime-tokio. If you see feature errors, compare your feature list with Runtime and Drivers and Features.

Worker Starts But A Job Does Not Run

First check that the task identifier used when adding the job exactly matches the registered handler:

impl TaskHandler for SendEmail {
    const IDENTIFIER: &'static str = "send_email";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        Ok::<(), String>(())
    }
}

The worker must register that handler before init():

let worker = WorkerOptions::default()
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

Then add jobs with the same identifier or with the type-safe helper:

worker
    .create_utils()
    .add_raw_job(
        "send_email",
        serde_json::json!({ "to": "user@example.com" }),
        Default::default(),
    )
    .await?;

If jobs are scheduled for the future, they will not run until their run_at time. If jobs share a queue name, jobs in that queue run in series rather than in parallel. See Tasks, Scheduling, and Queues.

Payload Deserialization Failures

Task payloads are deserialized into the task struct. If the payload shape does not match the struct, the handler cannot receive the data you expect.

Check the struct fields and the JSON payload side by side:

#[derive(Deserialize, Serialize)]
struct SendEmail {
    to: String,
    subject: String,
    body: String,
}
SELECT graphile_worker.add_job(
    'send_email',
    json_build_object(
        'to', 'user@example.com',
        'subject', 'Welcome',
        'body', 'Thanks for signing up.'
    )
);

When adding jobs from Rust, prefer the type-safe add_job path when the task type is available:

utils.add_job(
    SendEmail {
        to: "user@example.com".to_string(),
        subject: "Welcome".to_string(),
        body: "Thanks for signing up.".to_string(),
    },
    Default::default(),
).await?;

Shutdown Signal Conflicts

By default, Graphile Worker installs OS-level shutdown listeners such as SIGINT and SIGTERM so it can drain gracefully. If your application already owns shutdown handling, disable the built-in listeners and pass your own shutdown future:

let shutdown = WorkerShutdownConfig::default()
    .listen_os_shutdown_signals(false)
    .shutdown_signal(on_shutdown());

let worker = WorkerOptions::default()
    .worker_shutdown(shutdown)
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

See Shutdown for the configuration surface.

Locked Jobs After A Crash

Worker recovery is disabled by default. For deployments where jobs may remain locked after a process crash, network partition, forced abort, or orchestrator shutdown, enable recovery:

let recovery = WorkerRecoveryConfig::default()
    .enabled(true)
    .heartbeat_interval(Duration::from_secs(30))
    .sweep_interval(Duration::from_secs(60))
    .sweep_threshold(Duration::from_secs(300))
    .recovery_delay(Duration::from_secs(30));

let worker = WorkerOptions::default()
    .worker_recovery(recovery)
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

The CLI can also inspect and recover stale workers:

graphile-worker sweep-stale-workers --dry-run
graphile-worker sweep-stale-workers --sweep-threshold 5m --recovery-delay 30s

Use --dry-run first when you want to see which workers would be recovered without unlocking or rescheduling jobs. See Recovery.

CLI Checks

The installed binary is graphile-worker. It connects with --database-url or DATABASE_URL.

Useful checks:

graphile-worker list --state ready
graphile-worker show 123
graphile-worker stats
graphile-worker queues
graphile-worker workers

Useful repair commands:

graphile-worker fail 125 --reason "invalid payload"
graphile-worker reschedule 126 --run-at 2026-01-02T03:04:05Z
graphile-worker force-unlock graphile_worker_deadbeef
graphile-worker cleanup

Use the same database URL and schema that your worker uses, otherwise the CLI will inspect a different queue.

Local Test Environment Checks

The repository has two common test paths:

just test

This runs cargo test --all and requires DATABASE_URL to point at a usable PostgreSQL database.

just test-docker

This starts a postgres container named graphile-worker-rs-test, waits for pg_isready, runs tests with DATABASE_URL='postgres://postgres:postgres@localhost:54233/postgres', and then removes the container.

If Docker reports that the container name or port is already in use, remove the old test container or stop the process using port 54233:

docker rm -f graphile-worker-rs-test

Runtime-specific test helpers also exist:

just test-docker-runtime runtime-tokio driver-sqlx
just test-docker-runtime runtime-async-std driver-sqlx
just test-docker-runtime runtime-tokio driver-tokio-postgres

Documentation Build Checks

Documentation commands are defined in the repository justfile:

just docs-install
just docs
just docs-serve

just docs-install installs mdbook with version ^0.4. just docs runs mdbook build docs. just docs-serve serves the book on 127.0.0.1:3000 by default.